Files
server-manager/tools/ssh.py
chrome-storm-c442 c21b263b24 v1.9.25: show server group in --list and --info CLI output
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 06:55:32 -05:00

1903 lines
73 KiB
Python

#!/usr/bin/env python3
"""
SSH utility for Claude Code — connects to servers by alias.
Credentials stored locally in servers.json (encrypted), NEVER exposed to AI API.
Usage (SSH):
python ssh.py ALIAS "command" # run as configured user (auto-sudo if needed)
python ssh.py ALIAS --no-sudo "command" # run without sudo elevation
python ssh.py ALIAS --upload LOCAL REMOTE
python ssh.py ALIAS --download REMOTE LOCAL
python ssh.py ALIAS --install-key
python ssh.py ALIAS --ping
python ssh.py --list
python ssh.py --status
python ssh.py --info ALIAS # full info (no passwords)
python ssh.py --set-note ALIAS "desc" # update server notes
python ssh.py --add ALIAS IP PORT USER PASSWORD [--type ssh|telnet|mariadb|mssql|postgresql|redis|grafana|prometheus|winrm|rdp|s3] [--note "desc"] [--database DB] [--token TOKEN]
python ssh.py --remove ALIAS
SQL (type: mariadb / mssql / postgresql):
python ssh.py --sql ALIAS "SELECT * FROM users" # execute SQL query
python ssh.py --sql-databases ALIAS # list databases
python ssh.py --sql-tables ALIAS [database] # list tables
Redis (type: redis):
python ssh.py --redis ALIAS "GET mykey" # execute Redis command
python ssh.py --redis-info ALIAS # Redis INFO
python ssh.py --redis-keys ALIAS "user:*" # SCAN keys by pattern
Grafana (type: grafana):
python ssh.py --grafana-dashboards ALIAS # list dashboards
python ssh.py --grafana-alerts ALIAS # list alerts
Prometheus (type: prometheus):
python ssh.py --prom-query ALIAS "up" # execute PromQL query
python ssh.py --prom-targets ALIAS # list targets
python ssh.py --prom-alerts ALIAS # list alerts
S3 (type: s3):
python ssh.py --s3-buckets ALIAS # list buckets
python ssh.py --s3-ls ALIAS [bucket[/prefix]] # list objects
python ssh.py --s3-upload ALIAS local bucket/key # upload file
python ssh.py --s3-download ALIAS bucket/key local # download file
python ssh.py --s3-delete ALIAS bucket/key # delete object
python ssh.py --s3-url ALIAS bucket/key [SEC] # presigned URL (default 3600s)
python ssh.py --s3-create-bucket ALIAS name # create bucket
WinRM (type: winrm):
python ssh.py --ps ALIAS "Get-Process" # PowerShell via WinRM
python ssh.py --cmd ALIAS "dir" # CMD via WinRM
"""
import sys
import os
import json
import time
import hashlib
import paramiko
# Shared config — same file used by ServerManager GUI
SHARED_DIR = os.path.expanduser("~/.server-connections")
SETTINGS_FILE = os.path.join(SHARED_DIR, "settings.json")
DEFAULT_SERVERS_FILE = os.path.join(SHARED_DIR, "servers.json")
SSH_KEY_PATH = os.path.expanduser("~/.ssh/id_ed25519")
SSH_CONFIG_PATH = os.path.expanduser("~/.ssh/config")
# Encryption support — encryption.py is copied to SHARED_DIR by GUI setup
if SHARED_DIR not in sys.path:
sys.path.insert(0, SHARED_DIR)
try:
from encryption import decrypt, encrypt, is_encrypted
HAS_ENCRYPTION = True
except ImportError:
HAS_ENCRYPTION = False
def _get_servers_file() -> str:
"""Get servers file path from settings.json or use default."""
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f)
path = settings.get("servers_path", "")
if path and os.path.exists(path):
return path
except Exception:
pass
return DEFAULT_SERVERS_FILE
# ── Data ──────────────────────────────────────────────
def load_servers():
servers_file = _get_servers_file()
with open(servers_file, "rb") as f:
raw = f.read()
if HAS_ENCRYPTION and is_encrypted(raw):
text = decrypt(raw)
data = json.loads(text)
else:
data = json.loads(raw.decode("utf-8"))
return data, {s["alias"]: s for s in data.get("servers", [])}
def _group_map(data: dict) -> dict:
"""Map group UUID → group name."""
return {g["id"]: g.get("name", "") for g in data.get("groups", [])}
def save_servers(data):
servers_file = _get_servers_file()
text = json.dumps(data, indent=2, ensure_ascii=False)
if HAS_ENCRYPTION:
encrypted = encrypt(text)
with open(servers_file, "wb") as f:
f.write(encrypted)
else:
with open(servers_file, "w", encoding="utf-8") as f:
f.write(text)
# ── Connection ────────────────────────────────────────
def get_client(server: dict) -> paramiko.SSHClient:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
kwargs = {
"hostname": server["ip"],
"port": server.get("port", 22),
"username": server.get("user", "root"),
"timeout": 15,
"banner_timeout": 15,
}
def _harden_transport(c):
transport = c.get_transport()
if transport is not None:
transport.set_keepalive(15)
transport.default_window_size = 4 * 1024 * 1024
# Try key first
if os.path.exists(SSH_KEY_PATH):
try:
kwargs["key_filename"] = SSH_KEY_PATH
client.connect(**kwargs)
_harden_transport(client)
return client
except Exception:
del kwargs["key_filename"]
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Fallback to password
password = server.get("password", "")
if password:
kwargs["password"] = password
kwargs["look_for_keys"] = False
kwargs["allow_agent"] = False
client.connect(**kwargs)
_harden_transport(client)
return client
raise Exception(f"No auth method for {server['alias']}")
# ── Command execution ─────────────────────────────────
def run_command(server: dict, command: str, use_sudo: bool = True) -> tuple:
"""Execute command. If user != root and use_sudo=True, auto-elevates via sudo.
Password is fed through stdin (not visible in process list)."""
client = get_client(server)
try:
user = server.get("user", "root")
is_win = _is_windows_server(server)
need_sudo = not is_win and use_sudo and user != "root"
if is_win:
# Windows SSH: translate Linux commands to Windows equivalents
full_cmd = _sanitize_windows_command(command)
elif need_sudo:
# Use sudo -S to read password from stdin
# -p '' suppresses the password prompt text
full_cmd = f"sudo -S -p '' bash -c {_shell_quote(command)}"
else:
full_cmd = command
stdin, stdout, stderr = client.exec_command(full_cmd, timeout=120)
if need_sudo:
password = server.get("password", "")
stdin.write(password + "\n")
stdin.flush()
exit_code = stdout.channel.recv_exit_status()
out = stdout.read().decode("utf-8", errors="replace")
err = stderr.read().decode("utf-8", errors="replace")
# Strip sudo noise from stderr
err_lines = [l for l in err.splitlines()
if not l.startswith("[sudo]") and "password for" not in l.lower()]
err = "\n".join(err_lines).strip()
return out, err, exit_code
finally:
client.close()
def _shell_quote(s: str) -> str:
"""Safely quote a string for bash -c."""
return "'" + s.replace("'", "'\\''") + "'"
def _is_windows_server(server: dict) -> bool:
"""Detect if server is Windows by alias, notes, or type."""
stype = server.get("type", "ssh")
if stype in ("winrm", "rdp"):
return True
text = (server.get("alias", "") + " " + server.get("notes", "")).lower()
return "windows" in text or "win " in text
# ── Windows command translation ──────────────────────
# Map of Linux commands → Windows/PowerShell equivalents
# Each entry: (regex_pattern, replacement_callable)
# Patterns use ^\s* anchoring — matches only at command start, no partial-word risk
_WIN_CMD_MAP = [
# --- Skip / warn ---
(r'^\s*chmod\b.*', lambda m: 'echo [SKIP] chmod not supported on Windows'),
(r'^\s*chown\b.*', lambda m: 'echo [SKIP] chown not supported on Windows'),
# --- Environment ---
(r'^\s*export\s+(\w+)=(.*)', lambda m: f'set {m.group(1)}={m.group(2)}'),
(r'^\s*env\s*$', lambda m: 'set'),
(r'^\s*printenv\s*$', lambda m: 'set'),
(r'^\s*printenv\s+(\w+)', lambda m: f'echo %{m.group(1)}%'),
# --- File operations ---
(r'^\s*ls\s+-la\s+(.*)', lambda m: f'dir /a "{m.group(1).strip()}"'),
(r'^\s*ls\s+-lah?\s+(.*)', lambda m: f'dir /a "{m.group(1).strip()}"'),
(r'^\s*ls\s+-[a-zA-Z]*R[a-zA-Z]*\s*(.*)', lambda m: f'dir /s /a "{m.group(1).strip()}"' if m.group(1).strip() else 'dir /s /a'),
(r'^\s*ls\s+-[a-zA-Z]+\s+(.*)', lambda m: f'dir "{m.group(1).strip()}"'),
(r'^\s*ls\s+-[a-zA-Z]+\s*$', lambda m: 'dir'),
(r'^\s*ls\s+(.*)', lambda m: f'dir "{m.group(1).strip()}"'),
(r'^\s*ls\s*$', lambda m: 'dir'),
(r'^\s*cat\b\s+(.*)', lambda m: f'type {m.group(1).strip()}'),
(r'^\s*cp\s+-r\s+(.*?)\s+(.*)', lambda m: f'xcopy /E /I "{m.group(1).strip()}" "{m.group(2).strip()}"'),
(r'^\s*cp\s+(.*?)\s+(.*)', lambda m: f'copy "{m.group(1).strip()}" "{m.group(2).strip()}"'),
(r'^\s*mv\s+(.*?)\s+(.*)', lambda m: f'move "{m.group(1).strip()}" "{m.group(2).strip()}"'),
(r'^\s*rm\s+-rf?\s+(.*)', lambda m: f'rmdir /S /Q "{m.group(1).strip()}"'),
(r'^\s*rm\s+-f\s+(.*)', lambda m: f'del /F /Q "{m.group(1).strip()}"'),
(r'^\s*rm\s+(.*)', lambda m: f'del /Q "{m.group(1).strip()}"'),
(r'^\s*mkdir\s+-p\s+(.*)', lambda m: f'mkdir "{m.group(1).strip()}"'),
(r'^\s*touch\s+(.*)', lambda m: f'type nul > "{m.group(1).strip()}"'),
(r'^\s*pwd\s*$', lambda m: 'cd'),
# --- Text processing (PowerShell) ---
(r'^\s*grep\s+-r\s+"?([^"]*)"?\s+(.*)', lambda m: f'powershell -Command "Get-ChildItem -Recurse {m.group(2).strip()} | Select-String -Pattern \'{m.group(1)}\'"'),
(r'^\s*grep\s+-i\s+"?([^"]*)"?\s+(.*)', lambda m: f'findstr /I "{m.group(1)}" {m.group(2).strip()}'),
(r'^\s*grep\s+"?([^"]*)"?\s+(.*)', lambda m: f'findstr "{m.group(1)}" {m.group(2).strip()}'),
(r'^\s*head\s+-n?\s*(\d+)\s+(.*)', lambda m: f'powershell -Command "Get-Content \'{m.group(2).strip()}\' | Select-Object -First {m.group(1)}"'),
(r'^\s*head\s+(.*)', lambda m: f'powershell -Command "Get-Content \'{m.group(1).strip()}\' | Select-Object -First 10"'),
(r'^\s*tail\s+-n?\s*(\d+)\s+(.*)', lambda m: f'powershell -Command "Get-Content \'{m.group(2).strip()}\' -Tail {m.group(1)}"'),
(r'^\s*tail\s+-f\s+(.*)', lambda m: f'powershell -Command "Get-Content \'{m.group(1).strip()}\' -Wait -Tail 20"'),
(r'^\s*tail\s+(.*)', lambda m: f'powershell -Command "Get-Content \'{m.group(1).strip()}\' -Tail 10"'),
(r'^\s*wc\s+-l\s+(.*)', lambda m: f'powershell -Command "(Get-Content \'{m.group(1).strip()}\' | Measure-Object -Line).Lines"'),
# --- Search ---
(r'^\s*find\s+(.*?)\s+-name\s+"?([^"]*)"?', lambda m: f'powershell -Command "Get-ChildItem -Path \'{m.group(1).strip()}\' -Recurse -Filter \'{m.group(2)}\'"'),
(r'^\s*which\s+(.*)', lambda m: f'where.exe {m.group(1).strip()}'),
(r'^\s*whereis\s+(.*)', lambda m: f'where.exe {m.group(1).strip()}'),
# --- Process / system ---
(r'^\s*ps\s+aux\s*$', lambda m: 'tasklist'),
(r'^\s*ps\s+-ef\s*$', lambda m: 'tasklist'),
(r'^\s*ps\s*$', lambda m: 'tasklist'),
(r'^\s*kill\s+-9\s+(\d+)', lambda m: f'taskkill /F /PID {m.group(1)}'),
(r'^\s*kill\s+(\d+)', lambda m: f'taskkill /PID {m.group(1)}'),
(r'^\s*top\s*$', lambda m: 'powershell -Command "Get-Process | Sort-Object CPU -Descending | Select-Object -First 20 Name, Id, CPU, @{N=\'Mem(MB)\';E={[math]::Round($_.WS/1MB,1)}}"'),
(r'^\s*df\s*', lambda m: 'powershell -Command "Get-PSDrive -PSProvider FileSystem | Select-Object Name, @{N=\'Used(GB)\';E={[math]::Round($_.Used/1GB,1)}}, @{N=\'Free(GB)\';E={[math]::Round($_.Free/1GB,1)}}"'),
(r'^\s*free\b.*', lambda m: 'powershell -Command "$os=Get-CimInstance Win32_OperatingSystem; [PSCustomObject]@{TotalMB=[math]::Round($os.TotalVisibleMemorySize/1024); FreeMB=[math]::Round($os.FreePhysicalMemory/1024); UsedMB=[math]::Round(($os.TotalVisibleMemorySize-$os.FreePhysicalMemory)/1024)} | Format-List"'),
(r'^\s*du\s+-sh?\s+(.*)', lambda m: f'powershell -Command "(Get-ChildItem -Recurse \'{m.group(1).strip()}\' | Measure-Object -Property Length -Sum).Sum / 1MB | ForEach-Object {{Write-Host ([math]::Round($_,1)) \'MB\'}}"'),
(r'^\s*uname\b.*', lambda m: 'powershell -Command "[System.Environment]::OSVersion | Format-List"'),
(r'^\s*uptime\s*$', lambda m: 'powershell -Command "$b=(Get-CimInstance Win32_OperatingSystem).LastBootUpTime; $u=(New-TimeSpan $b (Get-Date)); Write-Host \"up $($u.Days)d $($u.Hours)h $($u.Minutes)m, since $b\""'),
(r'^\s*whoami\s*$', lambda m: 'whoami'),
(r'^\s*hostname\s*$', lambda m: 'hostname'),
# --- Service management ---
(r'^\s*systemctl\s+status\s+(.*)', lambda m: f'powershell -Command "Get-Service \'{m.group(1).strip()}\' | Format-List"'),
(r'^\s*systemctl\s+start\s+(.*)', lambda m: f'powershell -Command "Start-Service \'{m.group(1).strip()}\'"'),
(r'^\s*systemctl\s+stop\s+(.*)', lambda m: f'powershell -Command "Stop-Service \'{m.group(1).strip()}\'"'),
(r'^\s*systemctl\s+restart\s+(.*)', lambda m: f'powershell -Command "Restart-Service \'{m.group(1).strip()}\'"'),
(r'^\s*systemctl\s+list-units\b.*', lambda m: 'powershell -Command "Get-Service | Format-Table Name, Status, DisplayName -AutoSize"'),
# --- Networking ---
(r'^\s*ifconfig\s*$', lambda m: 'ipconfig'),
(r'^\s*ip\s+addr\b.*', lambda m: 'ipconfig /all'),
(r'^\s*ip\s+route\b.*', lambda m: 'route print'),
(r'^\s*netstat\b(.*)', lambda m: f'netstat{m.group(1)}'),
(r'^\s*ss\s+-tulnp\s*$', lambda m: 'netstat -ano'),
(r'^\s*curl\s+(.*)', lambda m: f'powershell -Command "Invoke-WebRequest -Uri \'{m.group(1).strip()}\' -UseBasicParsing | Select-Object StatusCode, Content"'),
(r'^\s*wget\s+(.*)', lambda m: f'powershell -Command "Invoke-WebRequest -Uri \'{m.group(1).strip()}\' -OutFile ([System.IO.Path]::GetFileName(\'{m.group(1).strip()}\'))"'),
]
def _strip_sudo(command: str) -> str:
"""Strip sudo and its flags from a command. Handles -u USER, -S, -p '', etc."""
# sudo flags that consume the next argument
_SUDO_ARG_FLAGS = {'-u', '-g', '-C', '-p', '-r', '-t', '-D'}
parts = command.strip().split()
if not parts or parts[0] != 'sudo':
return command
i = 1
while i < len(parts):
if parts[i].startswith('-'):
flag = parts[i]
i += 1
if flag in _SUDO_ARG_FLAGS and i < len(parts):
i += 1 # skip argument
else:
break
return ' '.join(parts[i:]) if i < len(parts) else ''
def _sanitize_windows_command(command: str) -> str:
"""Translate Linux commands to Windows/PowerShell equivalents for Windows SSH servers.
Features:
- Translates common Linux commands (ls, cat, grep, ps, df, etc.)
- Wraps pipe chains in PowerShell when needed
- Forces UTF-8 via chcp 65001
- Passes through native Windows/PowerShell commands unchanged
- Auto-removes sudo, skips chmod/chown with warning
"""
import re
# 0. Empty / whitespace guard
if not command or not command.strip():
return command
# 1. Strip sudo early (before any other logic)
if re.match(r'^\s*sudo\b', command):
command = _strip_sudo(command)
if not command:
return command
# 2. Handle && chains — split, translate each, rejoin (before passthrough!)
if ' && ' in command:
parts = command.split(' && ')
translated = [_translate_single_command(p.strip()) for p in parts]
joined = ' && '.join(translated)
return f"chcp 65001 >nul && {joined}"
# 3. Handle pipes (before passthrough!)
if '|' in command:
return _translate_piped_command(command)
# 4. Passthrough — already Windows/PowerShell native (single commands only)
stripped = command.strip()
passthrough_prefixes = ('powershell ', 'powershell.exe ', 'pwsh ', 'pwsh.exe ',
'cmd /c ', 'cmd.exe /c ', 'chcp ', 'dir ', 'type ',
'copy ', 'move ', 'del ', 'mkdir ', 'rmdir ',
'tasklist', 'taskkill', 'ipconfig', 'netstat',
'systeminfo', 'where.exe', 'findstr ', 'echo ',
'set ', 'reg ', 'sc ', 'wmic ',
'Get-', 'Set-', 'New-', 'Remove-', 'Start-', 'Stop-',
'Restart-', 'Invoke-', 'Select-', 'Write-', 'Out-',
'Format-', 'Test-', 'Import-', 'Export-')
stripped_lower = stripped.lower()
for prefix in passthrough_prefixes:
if stripped_lower.startswith(prefix.lower()):
return f"chcp 65001 >nul && {command}"
# 5. Single command translation
translated = _translate_single_command(command)
return f"chcp 65001 >nul && {translated}"
def _translate_single_command(command: str) -> str:
"""Translate a single (non-piped) Linux command to Windows equivalent."""
import re
for pattern, replacement in _WIN_CMD_MAP:
match = re.match(pattern, command.strip(), re.IGNORECASE)
if match:
try:
return replacement(match)
except Exception:
continue
return command # No match — pass through as-is
def _translate_piped_command(command: str) -> str:
"""Translate a piped Linux command chain to PowerShell."""
import re
# If it already contains PowerShell cmdlets, pass through
if re.search(r'Get-|Set-|Select-|Where-|ForEach-|Measure-', command):
return f"chcp 65001 >nul && {command}"
# Split by pipe, translate first command, check if we need PowerShell wrapping
parts = [p.strip() for p in command.split('|')]
# Try translating the first segment
first_translated = _translate_single_command(parts[0])
# If the pipe chain mixes cmd and grep/awk/sed — wrap entire thing in PowerShell
needs_ps = any(re.match(r'\s*(grep|awk|sed|sort|uniq|wc|head|tail|cut|tr)\b', p, re.IGNORECASE)
for p in parts[1:])
if needs_ps:
# Build a PowerShell pipeline
ps_parts = [_linux_to_ps_pipe_segment(p.strip()) for p in parts]
ps_cmd = ' | '.join(ps_parts)
return f'chcp 65001 >nul && powershell -Command "{ps_cmd}"'
# Otherwise just join translated parts
translated_parts = [first_translated] + [_translate_single_command(p) for p in parts[1:]]
return f"chcp 65001 >nul && {' | '.join(translated_parts)}"
def _linux_to_ps_pipe_segment(segment: str) -> str:
"""Convert a single pipe segment from Linux to PowerShell equivalent."""
import re
s = segment.strip()
# cat → Get-Content
m = re.match(r'cat\b\s+(.*)', s, re.IGNORECASE)
if m:
return f"Get-Content '{m.group(1).strip()}'"
# ps aux/ps -ef → Get-Process
if re.match(r'ps\s+(aux|-ef)\s*$', s, re.IGNORECASE):
return 'Get-Process'
# grep → Select-String
m = re.match(r'grep\s+(-i\s+)?"?([^"]*)"?', s, re.IGNORECASE)
if m:
return f"Select-String -Pattern '{m.group(2)}'"
# sort
if re.match(r'sort\s*$', s, re.IGNORECASE):
return 'Sort-Object'
# sort -r / sort -n
if re.match(r'sort\s+-r', s, re.IGNORECASE):
return 'Sort-Object -Descending'
if re.match(r'sort\s+-n', s, re.IGNORECASE):
return 'Sort-Object {[int]$_}'
# uniq
if re.match(r'uniq\s*$', s, re.IGNORECASE):
return 'Get-Unique'
# wc -l
if re.match(r'wc\s+-l', s, re.IGNORECASE):
return 'Measure-Object -Line'
# head -n N
m = re.match(r'head\s+(-n\s*)?(\d+)', s, re.IGNORECASE)
if m:
return f'Select-Object -First {m.group(2)}'
if re.match(r'head\s*$', s, re.IGNORECASE):
return 'Select-Object -First 10'
# tail -n N
m = re.match(r'tail\s+(-n\s*)?(\d+)', s, re.IGNORECASE)
if m:
return f'Select-Object -Last {m.group(2)}'
if re.match(r'tail\s*$', s, re.IGNORECASE):
return 'Select-Object -Last 10'
# awk — basic field extraction
m = re.match(r"awk\s+['\"]?\{print\s+\$(\d+)\}['\"]?", s, re.IGNORECASE)
if m:
idx = int(m.group(1)) - 1
return f"ForEach-Object {{ ($_ -split '\\s+')[{idx}] }}"
# cut -d'X' -f N
m = re.match(r"cut\s+-d['\"]?(.)['\"]?\s+-f(\d+)", s, re.IGNORECASE)
if m:
idx = int(m.group(2)) - 1
return f"ForEach-Object {{ ($_ -split '{m.group(1)}')[{idx}] }}"
# sed — basic s/old/new/
m = re.match(r"sed\s+['\"]?s/([^/]*)/([^/]*)/?[g]?['\"]?", s, re.IGNORECASE)
if m:
return f"ForEach-Object {{ $_ -replace '{m.group(1)}', '{m.group(2)}' }}"
# tr -d 'X'
m = re.match(r"tr\s+-d\s+['\"]?(.+?)['\"]?\s*$", s, re.IGNORECASE)
if m:
return f"ForEach-Object {{ $_ -replace '[{m.group(1)}]', '' }}"
# Fallback — return as-is
return s
# ── File transfer ─────────────────────────────────────
def _normalize_remote_path(remote_path: str) -> str:
"""Normalize remote path by detecting and fixing MSYS path conversions."""
# Strip leading // that user adds to prevent Git Bash MSYS path conversion
# //C:/Temp → /C:/Temp
if remote_path.startswith("//") and len(remote_path) > 2 and remote_path[2] != "/":
remote_path = remote_path[1:]
# If the path looks like a Windows path that was converted by MSYS, fix it back
if ':' in remote_path and ('Program Files/Git' in remote_path or (len(remote_path) > 3 and remote_path[1] == ':' and remote_path[2] == '/')):
# Convert C:/Program Files/Git/tmp/file.txt back to /tmp/file.txt
# Find the position where Git path starts
if 'Program Files/Git' in remote_path:
git_pos = remote_path.find('Program Files/Git')
if git_pos != -1:
# Extract the part after Program Files/Git
actual_path = remote_path[git_pos + len('Program Files/Git'):]
return actual_path
# If it's just a drive letter followed by :, convert it too
if len(remote_path) > 3 and remote_path[1] == ':' and remote_path[2] == '/':
# This is a Windows-style path like C:/something
# Try to determine if it's supposed to be a Unix path
potential_unix_path = remote_path[3:] # Remove drive prefix like "C:"
# If the resulting path starts with a common Unix directory, assume it should be Unix path
common_unix_prefixes = ['/tmp/', '/home/', '/etc/', '/var/', '/usr/', '/opt/', '/root/', '/bin/', '/sbin/', '/lib/', '/lib64/']
for prefix in common_unix_prefixes:
if potential_unix_path.startswith(prefix):
return potential_unix_path
return remote_path
def _fmt_size(nbytes: int) -> str:
"""Format byte count for human display."""
if nbytes < 1024:
return f"{nbytes} B"
elif nbytes < 1024 * 1024:
return f"{nbytes / 1024:.1f} KB"
elif nbytes < 1024 * 1024 * 1024:
return f"{nbytes / (1024 * 1024):.1f} MB"
else:
return f"{nbytes / (1024 * 1024 * 1024):.2f} GB"
def _progress_cb(total_bytes: int):
"""Return a Paramiko-compatible progress callback.
For files >= 1 MB, prints at 25%, 50%, 75% milestones.
For files < 1 MB, stays silent."""
threshold = 1024 * 1024 # 1 MB
reported = set()
def callback(transferred: int, total: int):
if total < threshold:
return
pct = int(transferred * 100 / total)
for milestone in (25, 50, 75):
if pct >= milestone and milestone not in reported:
reported.add(milestone)
print(f"{milestone}% ({_fmt_size(transferred)}/{_fmt_size(total)})")
return callback
RESUME_THRESHOLD = 10 * 1024 * 1024 # >10MB → chunked resume
CHUNK_SIZE = 256 * 1024 # 256KB per write
MAX_RETRIES = 5
def _sha256_local(path: str) -> str:
"""SHA256 hash of a local file."""
h = hashlib.sha256()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(1024 * 1024), b''):
h.update(chunk)
return h.hexdigest()
def _sha256_remote(client, remote_path: str, is_windows: bool = False) -> str | None:
"""SHA256 hash of a remote file via exec.
Returns None if sha256sum is unavailable."""
if is_windows:
# SFTP path may start with / (e.g. /C:/Users/...) — strip for PowerShell
win_path = remote_path
if win_path.startswith('/') and len(win_path) > 2 and win_path[2] == ':':
win_path = win_path[1:]
escaped = win_path.replace('"', '`"')
cmd = f'powershell -Command "(Get-FileHash -Path \\"{escaped}\\" -Algorithm SHA256).Hash"'
else:
escaped = remote_path.replace("'", "'\\''")
cmd = (f"sha256sum '{escaped}' 2>/dev/null || "
f"shasum -a 256 '{escaped}' 2>/dev/null")
try:
stdin, stdout, stderr = client.exec_command(cmd, timeout=120)
output = stdout.read().decode().strip()
exit_code = stdout.channel.recv_exit_status()
if exit_code == 0 and output:
return output.split()[0].lower()
except Exception:
pass
return None
def upload_file(server: dict, local_path: str, remote_path: str):
normalized = _normalize_remote_path(remote_path)
file_size = os.path.getsize(local_path)
if file_size > RESUME_THRESHOLD:
_upload_resumable(server, local_path, normalized, file_size)
else:
_upload_simple(server, local_path, normalized, file_size)
def _upload_simple(server, local_path, remote_path, file_size):
"""Simple upload for files <=10MB."""
client = get_client(server)
try:
sftp = client.open_sftp()
t0 = time.time()
sftp.put(local_path, remote_path, callback=_progress_cb(file_size))
elapsed = time.time() - t0
try:
sftp.chmod(remote_path, 0o664)
except OSError:
pass
sftp.close()
_print_result(server, local_path, remote_path, file_size, elapsed)
finally:
client.close()
def _upload_resumable(server, local_path, remote_path, file_size):
"""Chunked upload with resume, retry, atomic rename and SHA256 verification."""
tmp_path = remote_path + ".part"
progress = _progress_cb(file_size)
is_windows = _is_windows_server(server)
t0 = time.time()
# Adaptive retries: more attempts for larger files (unstable links need resume)
max_retries = max(MAX_RETRIES, min(file_size // (10 * 1024 * 1024) + 3, 30))
for attempt in range(1, max_retries + 1):
client = None
sftp = None
try:
client = get_client(server)
sftp = client.open_sftp()
# How much is already uploaded?
remote_offset = 0
try:
remote_offset = sftp.stat(tmp_path).st_size
if remote_offset > file_size:
sftp.remove(tmp_path)
remote_offset = 0
except FileNotFoundError:
remote_offset = 0
if 0 < remote_offset < file_size:
print(f"Resume: {_fmt_size(remote_offset)}/{_fmt_size(file_size)} "
f"({remote_offset * 100 // file_size}%)")
# Write remaining data
if remote_offset < file_size:
with open(local_path, 'rb') as f:
f.seek(remote_offset)
if remote_offset > 0:
rf = sftp.open(tmp_path, 'r+b')
rf.seek(remote_offset)
else:
rf = sftp.open(tmp_path, 'wb')
rf.set_pipelined(True)
try:
transferred = remote_offset
while transferred < file_size:
data = f.read(CHUNK_SIZE)
if not data:
break
rf.write(data)
transferred += len(data)
progress(transferred, file_size)
finally:
rf.close()
# === VALIDATE: size ===
actual = sftp.stat(tmp_path).st_size
if actual != file_size:
raise IOError(f"Size mismatch: expected {file_size}, got {actual}")
# === VALIDATE: SHA256 before rename (always, even if resumed) ===
print("Verifying SHA256...", end=" ", flush=True)
local_hash = _sha256_local(local_path)
remote_hash = _sha256_remote(client, tmp_path, is_windows)
if remote_hash is not None and local_hash != remote_hash:
print(f"MISMATCH on attempt {attempt}", file=sys.stderr)
sftp.remove(tmp_path)
if attempt < max_retries:
continue # Retry from scratch
else:
raise IOError(
f"CHECKSUM MISMATCH after {max_retries} attempts!\n"
f" local: {local_hash}\n"
f" remote: {remote_hash}"
)
elif remote_hash is not None:
print(f"OK ({local_hash[:16]}...)")
else:
print("SKIP (sha256sum unavailable)")
# Atomic rename: .part → final
try:
sftp.remove(remote_path)
except (FileNotFoundError, IOError):
pass
sftp.rename(tmp_path, remote_path)
try:
sftp.chmod(remote_path, 0o664)
except OSError:
pass
elapsed = time.time() - t0
_print_result(server, local_path, remote_path, file_size, elapsed)
return # Success
except (EOFError, TimeoutError, OSError,
paramiko.SSHException, ConnectionError) as e:
print(f"Attempt {attempt}/{max_retries} failed: {e}", file=sys.stderr)
if attempt < max_retries:
delay = max(5, min(2 ** attempt, 30))
print(f"Retry in {delay}s...", file=sys.stderr)
time.sleep(delay)
else:
raise SystemExit(f"ERROR: Upload failed after {max_retries} attempts: {e}")
finally:
if sftp:
try: sftp.close()
except Exception: pass
if client:
try: client.close()
except Exception: pass
def _print_result(server, local_path, remote_path, file_size, elapsed):
info = f"{_fmt_size(file_size)}, {elapsed:.1f}s"
if file_size >= 1024 * 1024 and elapsed > 0:
speed = file_size / elapsed
info += f", {_fmt_size(int(speed))}/s"
print(f"OK: {local_path} -> {server['alias']}:{remote_path} ({info})")
def download_file(server: dict, remote_path: str, local_path: str):
normalized_remote_path = _normalize_remote_path(remote_path)
client = get_client(server)
try:
sftp = client.open_sftp()
file_size = sftp.stat(normalized_remote_path).st_size
t0 = time.time()
sftp.get(normalized_remote_path, local_path, callback=_progress_cb(file_size))
elapsed = time.time() - t0
sftp.close()
info = f"{_fmt_size(file_size)}, {elapsed:.1f}s"
if file_size >= 1024 * 1024 and elapsed > 0:
speed = file_size / elapsed
info += f", {_fmt_size(int(speed))}/s"
print(f"OK: {server['alias']}:{normalized_remote_path} -> {local_path} ({info})")
finally:
client.close()
def install_key(server: dict):
pub_key_path = SSH_KEY_PATH + ".pub"
if not os.path.exists(pub_key_path):
print(f"ERROR: No public key at {pub_key_path}")
sys.exit(1)
with open(pub_key_path, "r") as f:
pub_key = f.read().strip()
check_cmd = f'grep -c "{pub_key}" ~/.ssh/authorized_keys 2>/dev/null || echo 0'
out, _, _ = run_command(server, check_cmd, use_sudo=False)
if out.strip() != "0":
print(f"Key already installed on {server['alias']}")
return
command = (
f'mkdir -p ~/.ssh && chmod 700 ~/.ssh && '
f'echo "{pub_key}" >> ~/.ssh/authorized_keys && '
f'chmod 600 ~/.ssh/authorized_keys && '
f'echo "KEY_OK"'
)
out, err, code = run_command(server, command, use_sudo=False)
if "KEY_OK" in out:
print(f"SSH key installed on {server['alias']}")
else:
print(f"ERROR: {err or out}")
sys.exit(1)
# ── Server management ─────────────────────────────────
def ping_server(server: dict):
try:
status = _check_status_one(server)
print(f"{server['alias']}: {status}")
except Exception as e:
print(f"{server['alias']}: OFFLINE ({type(e).__name__})")
def list_servers(full=False):
data, servers = load_servers()
groups = _group_map(data)
if full:
# WARNING: full mode shows sensitive data (IP, port, user)
# Only for local/manual use, NEVER through AI API
print("WARNING: Full mode — contains sensitive data. Do NOT pipe to AI.")
print(f"{'Alias':<20} {'IP':<20} {'Port':<8} {'User':<10} {'Key':<6}")
print("-" * 64)
for alias, s in servers.items():
has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no"
print(f"{alias:<20} {s['ip']:<20} {s.get('port', 22):<8} {s.get('user', 'root'):<10} {has_key:<6}")
else:
# Safe mode: only aliases (no IPs, ports, users)
print(f"{'Alias':<20} {'Type':<10} {'Group':<14} {'Key':<6} {'Notes'}")
print("-" * 80)
for alias, s in servers.items():
has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no"
stype = s.get("type", "ssh")
group_name = groups.get(s.get("group", ""), "-")
notes = s.get("notes", "")
print(f"{alias:<20} {stype:<10} {group_name:<14} {has_key:<6} {notes}")
def _resolve_alias(alias: str, servers: dict) -> str:
"""Resolve alias — exact match, then whole-word search, then substring fallback."""
if alias in servers:
return alias
query = alias.lower()
# 1) Whole-word match (e.g. "tor" matches "API TOR contabo" but NOT "investor")
import re
word_re = re.compile(r'\b' + re.escape(query) + r'\b', re.IGNORECASE)
word_matches = [a for a in servers if word_re.search(a)]
if len(word_matches) == 1:
return word_matches[0]
if len(word_matches) > 1:
print(f"Ambiguous: '{alias}' matches multiple servers:")
for m in word_matches:
print(f" - {m}")
sys.exit(1)
# 2) Substring fallback (e.g. "cont" matches "contabo")
sub_matches = [a for a in servers if query in a.lower()]
if len(sub_matches) == 1:
return sub_matches[0]
if len(sub_matches) > 1:
print(f"Ambiguous: '{alias}' matches multiple servers:")
for m in sub_matches:
print(f" - {m}")
sys.exit(1)
print(f"Unknown: '{alias}'. Available: {', '.join(servers.keys())}")
sys.exit(1)
def server_info(alias: str):
"""Show server info safe for AI context — NO ip, user, password, port, totp_secret."""
data, servers = load_servers()
groups = _group_map(data)
alias = _resolve_alias(alias, servers)
s = servers[alias]
has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no"
print(f"Alias: {s['alias']}")
print(f"Type: {s.get('type', 'ssh')}")
group_name = groups.get(s.get("group", ""), "")
if group_name:
print(f"Group: {group_name}")
print(f"Key: {has_key}")
print(f"Auth: {s.get('auth', 'password')}")
print(f"2FA: {'yes' if s.get('totp_secret') else 'no'}")
notes = s.get("notes", "")
if notes:
print(f"Notes: {notes}")
def _check_status_one(server: dict) -> str:
"""Check connectivity for a single server based on its type."""
stype = server.get("type", "ssh")
if stype in ("ssh", "telnet"):
client = get_client(server)
client.close()
return "ONLINE"
if stype in ("mariadb", "mysql", "mssql", "postgresql"):
host = server["ip"]
port = server.get("port", 3306)
user = server.get("user", "root")
password = server.get("password", "")
database = server.get("database", "")
if stype in ("mariadb", "mysql"):
import pymysql
conn = pymysql.connect(host=host, port=port, user=user, password=password,
database=database or None, connect_timeout=10)
elif stype == "mssql":
import pymssql
conn = pymssql.connect(server=host, port=port, user=user, password=password,
database=database or None, login_timeout=10)
elif stype == "postgresql":
import psycopg2
port = server.get("port", 5432)
conn = psycopg2.connect(host=host, port=port, user=user, password=password,
dbname=database or None, connect_timeout=10)
conn.close()
return "ONLINE"
if stype == "redis":
r = _get_redis_client(server)
try:
r.ping()
return "ONLINE"
finally:
r.close()
if stype == "grafana":
import requests
host = server["ip"]
port = server.get("port", 3000)
protocol = "https" if server.get("ssl", False) else "http"
base_url = server.get("base_url", f"{protocol}://{host}:{port}")
resp = requests.get(f"{base_url.rstrip('/')}/api/health", timeout=10,
verify=server.get("ssl_verify", True))
resp.raise_for_status()
return "ONLINE"
if stype == "prometheus":
import requests
host = server["ip"]
port = server.get("port", 9090)
protocol = "https" if server.get("ssl", False) else "http"
base_url = server.get("base_url", f"{protocol}://{host}:{port}")
auth = None
user = server.get("user", "")
password = server.get("password", "")
if user and password:
auth = (user, password)
resp = requests.get(f"{base_url.rstrip('/')}/api/v1/status/buildinfo",
auth=auth, timeout=10, verify=server.get("ssl_verify", True))
resp.raise_for_status()
return "ONLINE"
if stype == "winrm":
session = _get_winrm_session(server)
result = session.run_cmd("echo ok")
if result.status_code == 0:
return "ONLINE"
return "OFFLINE"
# rdp/vnc — just TCP ping
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
port = server.get("port", 3389 if stype == "rdp" else 5900)
sock.connect((server["ip"], port))
sock.close()
return "ONLINE"
def check_status():
_, servers = load_servers()
print(f"{'Alias':<20} {'Type':<12} {'Status':<10}")
print("-" * 42)
for alias, s in servers.items():
stype = s.get("type", "ssh")
try:
status = _check_status_one(s)
except Exception:
status = "OFFLINE"
print(f"{alias:<20} {stype:<12} {status:<10}")
def add_server(args):
if len(args) < 5:
print("Usage: --add ALIAS IP PORT USER PASSWORD [--type ssh|telnet|mariadb|mssql|postgresql|redis|grafana|prometheus|winrm|rdp] [--note \"desc\"] [--database DB] [--token TOKEN]")
sys.exit(1)
alias, ip, port, user, password = args[0], args[1], int(args[2]), args[3], args[4]
# Parse optional arguments
stype = "ssh" # default
note = ""
database = ""
token = ""
i = 5
while i < len(args):
arg = args[i]
if arg == "--type" and i + 1 < len(args):
stype = args[i + 1]
i += 2
elif arg == "--note" and i + 1 < len(args):
note = args[i + 1]
i += 2
elif arg == "--database" and i + 1 < len(args):
database = args[i + 1]
i += 2
elif arg == "--token" and i + 1 < len(args):
token = args[i + 1]
i += 2
else:
i += 1
# Validate server type
valid_types = ["ssh", "telnet", "mariadb", "mssql", "postgresql", "redis", "grafana", "prometheus", "winrm", "rdp", "s3"]
if stype not in valid_types:
print(f"ERROR: Invalid server type '{stype}'. Valid types: {', '.join(valid_types)}")
sys.exit(1)
data, servers = load_servers()
if alias in servers:
print(f"ERROR: '{alias}' already exists")
sys.exit(1)
new_server = {
"alias": alias,
"type": stype,
"ip": ip,
"port": port,
"user": user,
"password": password,
"notes": note
}
# Add type-specific fields
if stype in ["mariadb", "mssql", "postgresql"]:
if database:
new_server["database"] = database
elif stype in ["redis", "grafana", "prometheus"]:
if token:
new_server["token"] = token
elif stype == "s3":
# S3: user=access_key, password=secret_key, ip=endpoint
new_server["access_key"] = user
new_server["secret_key"] = password
new_server["use_ssl"] = True
if database:
new_server["bucket"] = database
elif stype in ["winrm", "rdp"]:
# WinRM/RDP may have additional auth fields
new_server["auth_method"] = "password" # default
# SSH-specific fields
if stype == "ssh":
new_server["auth"] = "ssh-key" # default auth method
data["servers"].append(new_server)
save_servers(data)
# Update SSH config only for SSH servers
if stype == "ssh":
update_ssh_config(alias, ip, port, user)
try:
install_key(new_server)
except Exception as e:
print(f"Warning: key not installed ({e}). Run: ssh.py {alias} --install-key")
print(f"Added: {alias} (type: {stype})")
def set_note(alias: str, note: str):
"""Update server notes — safe for AI (no credentials exposed)."""
data, servers = load_servers()
alias = _resolve_alias(alias, servers)
for s in data["servers"]:
if s["alias"] == alias:
s["notes"] = note
break
save_servers(data)
print(f"OK: notes updated for {alias}")
def remove_server(alias: str):
data, servers = load_servers()
alias = _resolve_alias(alias, servers)
data["servers"] = [s for s in data["servers"] if s["alias"] != alias]
save_servers(data)
remove_from_ssh_config(alias)
print(f"Removed: {alias}")
# ── SSH config ────────────────────────────────────────
def update_ssh_config(alias, ip, port, user):
if not os.path.exists(SSH_CONFIG_PATH):
return
with open(SSH_CONFIG_PATH, "r") as f:
content = f.read()
if f"Host {alias}\n" in content:
return
with open(SSH_CONFIG_PATH, "a") as f:
f.write(f"\nHost {alias}\n HostName {ip}\n User {user}\n Port {port}\n")
def remove_from_ssh_config(alias):
if not os.path.exists(SSH_CONFIG_PATH):
return
with open(SSH_CONFIG_PATH, "r") as f:
lines = f.readlines()
new_lines, skip = [], False
for line in lines:
if line.strip() == f"Host {alias}":
skip = True
continue
if skip and line.startswith(" "):
continue
skip = False
new_lines.append(line)
with open(SSH_CONFIG_PATH, "w") as f:
f.writelines(new_lines)
# ── SQL commands ──────────────────────────────────────
def _print_table(headers: list, rows: list):
"""Print a formatted ASCII table."""
if not rows:
print("(no rows)")
return
widths = [len(str(h)) for h in headers]
for row in rows:
for i, val in enumerate(row):
widths[i] = max(widths[i], len(str(val)))
fmt = " ".join(f"{{:<{w}}}" for w in widths)
print(fmt.format(*headers))
print(" ".join("-" * w for w in widths))
for row in rows:
print(fmt.format(*[str(v) for v in row]))
def run_sql(server: dict, query: str):
"""Execute SQL query against mariadb/mssql/postgresql server."""
stype = server.get("type", "mariadb")
host = server["ip"]
port = server.get("port", 3306)
user = server.get("user", "root")
password = server.get("password", "")
database = server.get("database", "")
if stype in ("mariadb", "mysql"):
import pymysql
conn = pymysql.connect(host=host, port=port, user=user, password=password,
database=database or None, connect_timeout=15,
charset="utf8mb4", cursorclass=pymysql.cursors.Cursor)
elif stype == "mssql":
import pymssql
conn = pymssql.connect(server=host, port=port, user=user, password=password,
database=database or None, login_timeout=15)
elif stype == "postgresql":
import psycopg2
port = server.get("port", 5432)
conn = psycopg2.connect(host=host, port=port, user=user, password=password,
dbname=database or None, connect_timeout=15)
else:
print(f"ERROR: Unsupported SQL type '{stype}'. Use mariadb, mssql, or postgresql.")
sys.exit(1)
try:
cur = conn.cursor()
cur.execute(query)
if cur.description:
headers = [desc[0] for desc in cur.description]
rows = cur.fetchall()
_print_table(headers, rows)
print(f"\n({len(rows)} row{'s' if len(rows) != 1 else ''})")
else:
conn.commit()
affected = cur.rowcount
print(f"OK: {affected} row{'s' if affected != 1 else ''} affected")
cur.close()
finally:
conn.close()
def sql_databases(server: dict):
"""List databases on SQL server."""
stype = server.get("type", "mariadb")
if stype in ("mariadb", "mysql"):
run_sql(server, "SHOW DATABASES")
elif stype == "mssql":
run_sql(server, "SELECT name FROM sys.databases ORDER BY name")
elif stype == "postgresql":
run_sql(server, "SELECT datname AS database FROM pg_database WHERE datistemplate = false ORDER BY datname")
else:
print(f"ERROR: Unsupported SQL type '{stype}'.")
sys.exit(1)
def sql_tables(server: dict, database: str = None):
"""List tables on SQL server, optionally for a specific database."""
stype = server.get("type", "mariadb")
if database:
server = dict(server)
server["database"] = database
if stype in ("mariadb", "mysql"):
if database:
run_sql(server, f"SHOW TABLES FROM `{database}`")
else:
run_sql(server, "SHOW TABLES")
elif stype == "mssql":
run_sql(server, "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_SCHEMA, TABLE_NAME")
elif stype == "postgresql":
run_sql(server, "SELECT schemaname, tablename FROM pg_tables WHERE schemaname NOT IN ('pg_catalog', 'information_schema') ORDER BY schemaname, tablename")
else:
print(f"ERROR: Unsupported SQL type '{stype}'.")
sys.exit(1)
# ── Redis commands ────────────────────────────────────
def _get_redis_client(server: dict):
"""Create a Redis client from server config. Single source of truth."""
import redis as redis_lib
return redis_lib.Redis(
host=server["ip"],
port=server.get("port", 6379),
password=server.get("password", "") or None,
db=server.get("db_index", 0),
decode_responses=True,
socket_timeout=10,
ssl=server.get("ssl", False),
)
def run_redis_cmd(server: dict, command: str):
"""Execute a Redis command."""
r = _get_redis_client(server)
try:
import shlex
try:
parts = shlex.split(command)
except ValueError:
parts = command.split()
if not parts:
print("ERROR: Empty Redis command")
sys.exit(1)
try:
result = r.execute_command(*parts)
except Exception as e:
print(f"(error) {e}", file=sys.stderr)
sys.exit(1)
if isinstance(result, list):
for i, item in enumerate(result):
print(f"{i + 1}) {item}")
print(f"\n({len(result)} items)")
elif isinstance(result, dict):
for k, v in result.items():
print(f"{k}: {v}")
elif isinstance(result, bytes):
print(result.decode("utf-8", errors="replace"))
else:
print(result)
finally:
r.close()
def redis_info(server: dict):
"""Show Redis INFO."""
r = _get_redis_client(server)
try:
info = r.info()
# Print key sections
sections = ["redis_version", "redis_mode", "os", "uptime_in_seconds",
"connected_clients", "used_memory_human", "used_memory_peak_human",
"total_connections_received", "total_commands_processed",
"keyspace_hits", "keyspace_misses", "role"]
print(f"{'Key':<35} {'Value'}")
print("-" * 60)
for key in sections:
if key in info:
print(f"{key:<35} {info[key]}")
# Print keyspace info (db0, db1, etc.)
for key in sorted(info.keys()):
if key.startswith("db"):
print(f"{key:<35} {info[key]}")
finally:
r.close()
def redis_keys(server: dict, pattern: str):
"""SCAN keys matching a pattern."""
r = _get_redis_client(server)
try:
keys = []
cursor = 0
while True:
cursor, batch = r.scan(cursor=cursor, match=pattern, count=200)
keys.extend(batch)
if cursor == 0:
break
if len(keys) >= 1000:
print("(truncated at 1000 keys)")
break
keys.sort()
for k in keys:
print(k)
print(f"\n({len(keys)} key{'s' if len(keys) != 1 else ''})")
finally:
r.close()
# ── S3 commands ──────────────────────────────────
def _get_s3_client(server: dict):
"""Create and connect a boto3 S3 client from server dict."""
try:
import boto3
import botocore.config
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError:
print("ERROR: boto3 not installed. Run: pip install boto3", file=sys.stderr)
sys.exit(1)
endpoint = server.get("ip", "")
if endpoint and not endpoint.startswith("http"):
use_ssl = server.get("use_ssl", True)
scheme = "https" if use_ssl else "http"
port = int(server.get("port", 443))
if (scheme == "https" and port == 443) or (scheme == "http" and port == 80):
endpoint = f"{scheme}://{endpoint}"
else:
endpoint = f"{scheme}://{endpoint}:{port}"
config = botocore.config.Config(
signature_version="s3v4",
connect_timeout=15,
read_timeout=60,
retries={"max_attempts": 5, "mode": "adaptive"},
tcp_keepalive=True,
)
return boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=server.get("access_key", ""),
aws_secret_access_key=server.get("secret_key", ""),
config=config,
verify=False,
)
def s3_buckets(server: dict):
"""List all S3 buckets."""
client = _get_s3_client(server)
try:
resp = client.list_buckets()
buckets = resp.get("Buckets", [])
if not buckets:
print("(no buckets)")
return
print(f"{'Name':<40} {'Created'}")
print("-" * 65)
for b in buckets:
created = b.get("CreationDate", "")
if created:
created = created.strftime("%Y-%m-%d %H:%M:%S")
print(f"{b['Name']:<40} {created}")
print(f"\n({len(buckets)} bucket{'s' if len(buckets) != 1 else ''})")
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
def s3_ls(server: dict, path: str = ""):
"""List objects in a bucket[/prefix]."""
client = _get_s3_client(server)
# Parse bucket/prefix from path
parts = path.split("/", 1) if path else []
bucket = parts[0] if parts else server.get("bucket", "")
prefix = parts[1] if len(parts) > 1 else ""
if not bucket:
print("ERROR: No bucket specified. Usage: --s3-ls ALIAS bucket[/prefix]", file=sys.stderr)
sys.exit(1)
try:
paginator = client.get_paginator("list_objects_v2")
kwargs = {"Bucket": bucket, "Delimiter": "/"}
if prefix:
if not prefix.endswith("/"):
prefix += "/"
kwargs["Prefix"] = prefix
total = 0
for page in paginator.paginate(**kwargs):
for cp in page.get("CommonPrefixes", []):
p = cp["Prefix"]
if prefix:
p = p[len(prefix):]
print(f" DIR {p}")
total += 1
for obj in page.get("Contents", []):
key = obj["Key"]
if key == prefix:
continue
name = key[len(prefix):] if prefix else key
size = obj.get("Size", 0)
modified = obj.get("LastModified", "")
if modified:
modified = modified.strftime("%Y-%m-%d %H:%M")
print(f"{size:>10} {modified} {name}")
total += 1
print(f"\n({total} item{'s' if total != 1 else ''})")
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
def _s3_transfer_config():
from boto3.s3.transfer import TransferConfig
return TransferConfig(
multipart_threshold=8 * 1024 * 1024,
multipart_chunksize=8 * 1024 * 1024,
max_concurrency=4,
num_download_attempts=10,
)
def s3_upload(server: dict, local_path: str, remote_path: str):
"""Upload a file to S3 with retry and resume."""
# Parse bucket/key
parts = remote_path.split("/", 1)
bucket = parts[0] if parts else server.get("bucket", "")
key = parts[1] if len(parts) > 1 else os.path.basename(local_path)
if not bucket:
print("ERROR: No bucket. Usage: --s3-upload ALIAS local bucket/key", file=sys.stderr)
sys.exit(1)
if not os.path.isfile(local_path):
print(f"ERROR: File not found: {local_path}", file=sys.stderr)
sys.exit(1)
size = os.path.getsize(local_path)
config = _s3_transfer_config()
max_retries = 10
for attempt in range(max_retries):
client = _get_s3_client(server)
try:
print(f"Uploading {local_path} -> s3://{bucket}/{key} ({size} bytes)...")
client.upload_file(local_path, bucket, key, Config=config)
print("OK")
return
except Exception as e:
delay = min(2 * (2 ** attempt), 60)
print(f"Attempt {attempt + 1}/{max_retries} failed: {e}", file=sys.stderr)
if attempt < max_retries - 1:
print(f"Retrying in {delay}s...", file=sys.stderr)
time.sleep(delay)
else:
print(f"ERROR: Upload failed after {max_retries} attempts", file=sys.stderr)
sys.exit(1)
def s3_download(server: dict, remote_path: str, local_path: str):
"""Download an object from S3 with retry."""
parts = remote_path.split("/", 1)
bucket = parts[0] if parts else server.get("bucket", "")
key = parts[1] if len(parts) > 1 else ""
if not bucket or not key:
print("ERROR: Usage: --s3-download ALIAS bucket/key local_path", file=sys.stderr)
sys.exit(1)
config = _s3_transfer_config()
max_retries = 10
for attempt in range(max_retries):
client = _get_s3_client(server)
try:
print(f"Downloading s3://{bucket}/{key} -> {local_path}...")
client.download_file(bucket, key, local_path, Config=config)
size = os.path.getsize(local_path)
print(f"OK ({size} bytes)")
return
except Exception as e:
delay = min(2 * (2 ** attempt), 60)
print(f"Attempt {attempt + 1}/{max_retries} failed: {e}", file=sys.stderr)
if attempt < max_retries - 1:
print(f"Retrying in {delay}s...", file=sys.stderr)
time.sleep(delay)
else:
print(f"ERROR: Download failed after {max_retries} attempts", file=sys.stderr)
sys.exit(1)
def s3_delete(server: dict, remote_path: str):
"""Delete an object from S3."""
client = _get_s3_client(server)
parts = remote_path.split("/", 1)
bucket = parts[0] if parts else server.get("bucket", "")
key = parts[1] if len(parts) > 1 else ""
if not bucket or not key:
print("ERROR: Usage: --s3-delete ALIAS bucket/key", file=sys.stderr)
sys.exit(1)
try:
client.delete_object(Bucket=bucket, Key=key)
print(f"Deleted s3://{bucket}/{key}")
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
def s3_url(server: dict, remote_path: str, expires: int = 3600):
"""Generate a presigned URL for an S3 object."""
client = _get_s3_client(server)
parts = remote_path.split("/", 1)
bucket = parts[0] if parts else server.get("bucket", "")
key = parts[1] if len(parts) > 1 else ""
if not bucket or not key:
print("ERROR: Usage: --s3-url ALIAS bucket/key [seconds]", file=sys.stderr)
sys.exit(1)
try:
url = client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": key},
ExpiresIn=expires,
)
print(url)
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
def s3_create_bucket(server: dict, bucket_name: str):
"""Create a new S3 bucket."""
client = _get_s3_client(server)
try:
client.create_bucket(Bucket=bucket_name)
print(f"Bucket created: {bucket_name}")
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
# ── Grafana commands ──────────────────────────────────
def _grafana_request(server: dict, endpoint: str) -> dict:
"""Make an authenticated GET request to Grafana API."""
import requests
host = server["ip"]
port = server.get("port", 3000)
protocol = "https" if server.get("ssl", False) else "http"
base_url = server.get("base_url", f"{protocol}://{host}:{port}")
api_key = server.get("api_key", server.get("password", ""))
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
url = f"{base_url.rstrip('/')}/api/{endpoint.lstrip('/')}"
resp = requests.get(url, headers=headers, timeout=15, verify=server.get("ssl_verify", True))
resp.raise_for_status()
return resp.json()
def grafana_dashboards(server: dict):
"""List Grafana dashboards."""
data = _grafana_request(server, "search?type=dash-db")
if not data:
print("(no dashboards found)")
return
headers = ["UID", "Title", "Folder", "URL"]
rows = []
for d in data:
rows.append([
d.get("uid", ""),
d.get("title", ""),
d.get("folderTitle", "(root)"),
d.get("url", ""),
])
_print_table(headers, rows)
print(f"\n({len(rows)} dashboard{'s' if len(rows) != 1 else ''})")
def grafana_alerts(server: dict):
"""List Grafana alert rules."""
data = _grafana_request(server, "alertmanager/grafana/api/v2/alerts")
if not data:
print("(no alerts)")
return
headers = ["Status", "Name", "Severity", "Summary"]
rows = []
for alert in data:
status = alert.get("status", {}).get("state", "unknown")
labels = alert.get("labels", {})
annotations = alert.get("annotations", {})
rows.append([
status,
labels.get("alertname", ""),
labels.get("severity", ""),
annotations.get("summary", "")[:80],
])
_print_table(headers, rows)
print(f"\n({len(rows)} alert{'s' if len(rows) != 1 else ''})")
# ── Prometheus commands ───────────────────────────────
def _prom_request(server: dict, endpoint: str, params: dict = None) -> dict:
"""Make a GET request to Prometheus API."""
import requests
host = server["ip"]
port = server.get("port", 9090)
protocol = "https" if server.get("ssl", False) else "http"
base_url = server.get("base_url", f"{protocol}://{host}:{port}")
auth = None
user = server.get("user", "")
password = server.get("password", "")
if user and password:
auth = (user, password)
url = f"{base_url.rstrip('/')}/api/v1/{endpoint.lstrip('/')}"
resp = requests.get(url, params=params, auth=auth, timeout=15,
verify=server.get("ssl_verify", True))
resp.raise_for_status()
return resp.json()
def prom_query(server: dict, query: str):
"""Execute a PromQL instant query."""
data = _prom_request(server, "query", {"query": query})
status = data.get("status", "")
if status != "success":
print(f"ERROR: Prometheus returned status '{status}'")
if "error" in data:
print(f" {data['error']}")
sys.exit(1)
result = data.get("data", {})
result_type = result.get("resultType", "")
results = result.get("result", [])
if not results:
print("(no results)")
return
if result_type == "vector":
headers = ["Metric", "Value", "Timestamp"]
rows = []
for r in results:
metric = r.get("metric", {})
label_str = ", ".join(f'{k}="{v}"' for k, v in metric.items())
ts, val = r.get("value", [0, ""])
rows.append([label_str or "{}", val, ts])
_print_table(headers, rows)
elif result_type == "scalar":
ts, val = results
print(f"Scalar: {val} (at {ts})")
elif result_type == "string":
ts, val = results
print(f"String: {val} (at {ts})")
elif result_type == "matrix":
for series in results:
metric = series.get("metric", {})
label_str = ", ".join(f'{k}="{v}"' for k, v in metric.items())
print(f"\n--- {label_str or '{}'} ---")
values = series.get("values", [])
for ts, val in values[-20:]: # last 20 samples
print(f" [{ts}] {val}")
if len(values) > 20:
print(f" ... ({len(values)} total samples, showing last 20)")
print(f"\n({len(results)} result{'s' if len(results) != 1 else ''}, type: {result_type})")
def prom_targets(server: dict):
"""List Prometheus scrape targets."""
data = _prom_request(server, "targets")
active = data.get("data", {}).get("activeTargets", [])
if not active:
print("(no active targets)")
return
headers = ["Job", "Instance", "State", "Health", "Last Scrape"]
rows = []
for t in active:
labels = t.get("labels", {})
rows.append([
labels.get("job", ""),
labels.get("instance", ""),
t.get("scrapePool", ""),
t.get("health", ""),
t.get("lastScrape", "")[:19],
])
_print_table(headers, rows)
print(f"\n({len(rows)} target{'s' if len(rows) != 1 else ''})")
def prom_alerts(server: dict):
"""List Prometheus alerts."""
data = _prom_request(server, "alerts")
alerts = data.get("data", {}).get("alerts", [])
if not alerts:
print("(no alerts)")
return
headers = ["State", "Name", "Severity", "Active Since"]
rows = []
for a in alerts:
labels = a.get("labels", {})
rows.append([
a.get("state", ""),
labels.get("alertname", ""),
labels.get("severity", ""),
a.get("activeAt", "")[:19],
])
_print_table(headers, rows)
print(f"\n({len(rows)} alert{'s' if len(rows) != 1 else ''})")
# ── WinRM commands ────────────────────────────────────
def _get_winrm_session(server: dict):
"""Create a WinRM session."""
import winrm
host = server["ip"]
port = server.get("port", 5985)
user = server.get("user", "Administrator")
password = server.get("password", "")
protocol = "https" if server.get("ssl", False) or port == 5986 else "http"
transport = server.get("transport", "ntlm")
endpoint = f"{protocol}://{host}:{port}/wsman"
session = winrm.Session(endpoint, auth=(user, password), transport=transport,
server_cert_validation="ignore" if protocol == "https" else "validate")
return session
def run_winrm_ps(server: dict, command: str):
"""Execute PowerShell command via WinRM."""
session = _get_winrm_session(server)
result = session.run_ps(command)
out = result.std_out.decode("utf-8", errors="replace").strip()
err = result.std_err.decode("utf-8", errors="replace").strip()
if out:
print(out)
if err:
print(err, file=sys.stderr)
sys.exit(result.status_code)
def run_winrm_cmd(server: dict, command: str):
"""Execute CMD command via WinRM."""
session = _get_winrm_session(server)
result = session.run_cmd(command)
out = result.std_out.decode("utf-8", errors="replace").strip()
err = result.std_err.decode("utf-8", errors="replace").strip()
if out:
print(out)
if err:
print(err, file=sys.stderr)
sys.exit(result.status_code)
# ── Main ──────────────────────────────────────────────
def main():
if len(sys.argv) < 2:
print(__doc__)
sys.exit(1)
cmd = sys.argv[1]
if cmd == "--list":
list_servers(); sys.exit(0)
if cmd == "--list-full":
list_servers(full=True); sys.exit(0)
if cmd == "--status":
check_status(); sys.exit(0)
if cmd == "--info" and len(sys.argv) >= 3:
server_info(sys.argv[2]); sys.exit(0)
if cmd == "--set-note" and len(sys.argv) >= 4:
set_note(sys.argv[2], sys.argv[3]); sys.exit(0)
if cmd == "--add":
add_server(sys.argv[2:]); sys.exit(0)
if cmd == "--remove" and len(sys.argv) >= 3:
remove_server(sys.argv[2]); sys.exit(0)
# ── SQL commands (global-style: --sql ALIAS ...) ──
if cmd == "--sql" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
run_sql(servers[alias], sys.argv[3])
sys.exit(0)
if cmd == "--sql-databases" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
sql_databases(servers[alias])
sys.exit(0)
if cmd == "--sql-tables" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
db = sys.argv[3] if len(sys.argv) >= 4 else None
sql_tables(servers[alias], db)
sys.exit(0)
# ── Redis commands ──
if cmd == "--redis" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
run_redis_cmd(servers[alias], sys.argv[3])
sys.exit(0)
if cmd == "--redis-info" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
redis_info(servers[alias])
sys.exit(0)
if cmd == "--redis-keys" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
redis_keys(servers[alias], sys.argv[3])
sys.exit(0)
# ── S3 commands ──
if cmd == "--s3-buckets" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
s3_buckets(servers[alias])
sys.exit(0)
if cmd == "--s3-ls" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
path = sys.argv[3] if len(sys.argv) >= 4 else ""
s3_ls(servers[alias], path)
sys.exit(0)
if cmd == "--s3-upload" and len(sys.argv) >= 5:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
s3_upload(servers[alias], sys.argv[3], sys.argv[4])
sys.exit(0)
if cmd == "--s3-download" and len(sys.argv) >= 5:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
s3_download(servers[alias], sys.argv[3], sys.argv[4])
sys.exit(0)
if cmd == "--s3-delete" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
s3_delete(servers[alias], sys.argv[3])
sys.exit(0)
if cmd == "--s3-url" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
expires = int(sys.argv[4]) if len(sys.argv) >= 5 else 3600
s3_url(servers[alias], sys.argv[3], expires)
sys.exit(0)
if cmd == "--s3-create-bucket" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
s3_create_bucket(servers[alias], sys.argv[3])
sys.exit(0)
# ── Grafana commands ──
if cmd == "--grafana-dashboards" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
grafana_dashboards(servers[alias])
sys.exit(0)
if cmd == "--grafana-alerts" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
grafana_alerts(servers[alias])
sys.exit(0)
# ── Prometheus commands ──
if cmd == "--prom-query" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
prom_query(servers[alias], sys.argv[3])
sys.exit(0)
if cmd == "--prom-targets" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
prom_targets(servers[alias])
sys.exit(0)
if cmd == "--prom-alerts" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
prom_alerts(servers[alias])
sys.exit(0)
# ── WinRM commands ──
if cmd == "--ps" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
run_winrm_ps(servers[alias], sys.argv[3])
if cmd == "--cmd" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
run_winrm_cmd(servers[alias], sys.argv[3])
# Server commands — exact match first, then fuzzy search by keyword
alias = cmd
_, servers = load_servers()
alias = _resolve_alias(alias, servers)
server = servers[alias]
if len(sys.argv) < 3:
print(f"Usage: ssh.py {alias} <command>")
sys.exit(1)
action = sys.argv[2]
if action == "--install-key":
install_key(server)
elif action == "--ping":
ping_server(server)
elif action == "--upload" and len(sys.argv) >= 5:
upload_file(server, sys.argv[3], sys.argv[4])
elif action == "--download" and len(sys.argv) >= 5:
download_file(server, sys.argv[3], sys.argv[4])
elif action == "--no-sudo":
command = " ".join(sys.argv[3:])
out, err, code = run_command(server, command, use_sudo=False)
if out: print(out, end="")
if err: print(err, end="", file=sys.stderr)
sys.exit(code)
else:
command = " ".join(sys.argv[2:])
out, err, code = run_command(server, command)
if out: print(out, end="")
if err: print(err, end="", file=sys.stderr)
sys.exit(code)
if __name__ == "__main__":
try:
main()
except SystemExit:
raise
except Exception as e:
print(f"ERROR: {type(e).__name__}: {e}", file=sys.stderr)
sys.exit(1)