""" Local AI agent integration setup. Installs the shared ssh.py/encryption.py backend, Claude /ssh command, Codex/Gemini skill packages, platform-specific wrappers, and SSH key material. """ import os import re import shutil import subprocess import sys from core.logger import log # PyInstaller: bundled data is in sys._MEIPASS; otherwise use project dir if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"): _BASE_DIR = sys._MEIPASS else: _BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) SSH_SCRIPT_SRC = os.path.join(_BASE_DIR, "tools", "ssh.py") ENCRYPTION_SRC = os.path.join(_BASE_DIR, "core", "encryption.py") CLAUDE_SKILL_SRC = os.path.join(_BASE_DIR, "tools", "skill-ssh.md") GEMINI_CONTRACT_SRC = os.path.join(_BASE_DIR, "GEMINI.md") CODEX_SKILL_SRC_DIR = os.path.join(_BASE_DIR, ".codex", "skills", "server-manager") CODEX_WRAPPER_SRC_SH = os.path.join(CODEX_SKILL_SRC_DIR, "scripts", "codex-ssh-wrapper.sh") CODEX_WRAPPER_SRC_CMD = os.path.join(CODEX_SKILL_SRC_DIR, "scripts", "codex-ssh-wrapper.cmd") GEMINI_SKILL_SRC_DIR = os.path.join(_BASE_DIR, ".gemini", "skills", "server-manager") GEMINI_WRAPPER_SRC_SH = os.path.join(GEMINI_SKILL_SRC_DIR, "scripts", "gemini-ssh-wrapper.sh") GEMINI_WRAPPER_SRC_CMD = os.path.join(GEMINI_SKILL_SRC_DIR, "scripts", "gemini-ssh-wrapper.cmd") _BLOCK_START = "" _BLOCK_END = "" _GEMINI_BLOCK_START = "" _GEMINI_BLOCK_END = "" GLOBAL_CLAUDE_MD_BLOCK = f"""{_BLOCK_START} ## Серверы — ТОЛЬКО через /ssh **НИКОГДА не используй raw `ssh` команды.** НИКОГДА не читай `~/.ssh/config` для поиска серверов. Все операции с серверами — **ТОЛЬКО через скилл `/ssh`** или напрямую через `ssh.py`: ```bash python ~/.server-connections/ssh.py --list # список серверов (alias, тип, заметки) python ~/.server-connections/ssh.py --info ALIAS # инфо (без creds) python ~/.server-connections/ssh.py --status # online/offline ``` При вопросе о сервере — **СНАЧАЛА `--list`**, найди нужный алиас по заметкам и **ПРОВЕРЬ ТИП**. Скрипт `ssh.py` сам читает credentials из зашифрованного хранилища. Claude НЕ видит IP, логины, пароли. ### КРИТИЧНО — команды зависят от типа сервера **`ALIAS "command"` (shell) — ТОЛЬКО для типов `ssh` и `telnet`!** | Тип | Команды | |-----|---------| | `ssh`/`telnet` | `ALIAS "cmd"`, `--upload ALIAS local remote`, `--download ALIAS remote local` | | `s3` (MinIO и др.) | `--s3-buckets ALIAS`, `--s3-ls ALIAS bucket/prefix`, `--s3-upload ALIAS local bucket/key`, `--s3-download ALIAS bucket/key local`, `--s3-delete ALIAS bucket/key`, `--s3-url ALIAS bucket/key [SEC]`, `--s3-create-bucket ALIAS name` | | `mariadb`/`mssql`/`postgresql` | `--sql ALIAS "SELECT ..."`, `--sql-databases ALIAS`, `--sql-tables ALIAS [db]` | | `redis` | `--redis ALIAS "GET key"`, `--redis-info ALIAS`, `--redis-keys ALIAS "pattern"` | | `grafana` | `--grafana-dashboards ALIAS`, `--grafana-alerts ALIAS` | | `prometheus` | `--prom-query ALIAS "up"`, `--prom-targets ALIAS`, `--prom-alerts ALIAS` | | `winrm` | `--ps ALIAS "Get-Process"`, `--cmd ALIAS "dir"` | **Формат: `python ~/.server-connections/ssh.py КОМАНДА АЛИАС АРГУМЕНТЫ`** — алиас ВСЕГДА второй после команды. **S3 правило:** перед `--s3-upload/download/delete` — СНАЧАЛА `--s3-buckets ALIAS` и `--s3-ls ALIAS bucket/` чтобы узнать реальные бакеты и пути. НЕ УГАДЫВАЙ имена бакетов! **Запрещено:** использовать `ssh`/`sshpass`, читать `~/.server-connections/` напрямую, раскрывать IP/пароли/порты. {_BLOCK_END} """ GLOBAL_GEMINI_MD_BLOCK = f"""{_GEMINI_BLOCK_START} ## ServerManager — use the installed skill When a user asks about a server managed by ServerManager, use the installed `server-manager` skill first. Preferred discovery commands: ```bash $HOME/.server-connections/gemini-ssh --list $HOME/.server-connections/gemini-ssh --info ALIAS $HOME/.server-connections/gemini-ssh --status ``` Rules: - Never read `~/.server-connections/servers.json`, `settings.json`, or `encryption.py` directly. - Never use `--list-full`. - Never use raw `ssh`, `scp`, `redis-cli`, `psql`, `mysql`, `mc`, or cloud CLIs unless the user explicitly asks to bypass ServerManager. - Choose commands strictly by the endpoint type reported by `--list`. - Use exactly one connection attempt per action and stop on timeout/failure. {_GEMINI_BLOCK_END} """ def _target_home() -> str: override = os.environ.get("SERVER_MANAGER_TARGET_HOME", "").strip() if override: return os.path.abspath(os.path.expanduser(override)) return os.path.expanduser("~") def _shared_dir() -> str: return os.path.join(_target_home(), ".server-connections") def _gemini_dir() -> str: return os.path.join(_target_home(), ".gemini") def _claude_skill_dst_dir() -> str: return os.path.join(_target_home(), ".claude", "commands") def _claude_skill_dst() -> str: return os.path.join(_claude_skill_dst_dir(), "ssh.md") def _ssh_key_path() -> str: return os.path.join(_target_home(), ".ssh", "id_ed25519") def _global_claude_md() -> str: return os.path.join(_target_home(), ".claude", "CLAUDE.md") def _global_gemini_md() -> str: return os.path.join(_gemini_dir(), "GEMINI.md") def _codex_skill_dst_root() -> str: return os.path.join(_target_home(), ".codex", "skills") def _codex_skill_dst_dir() -> str: return os.path.join(_codex_skill_dst_root(), "server-manager") def _codex_skill_entry() -> str: return os.path.join(_codex_skill_dst_dir(), "SKILL.md") def _codex_wrapper_dst() -> str: return os.path.join( _shared_dir(), "codex-ssh.cmd" if sys.platform == "win32" else "codex-ssh", ) def _gemini_skill_dst_root() -> str: return os.path.join(_gemini_dir(), "skills") def _gemini_skill_dst_dir() -> str: return os.path.join(_gemini_skill_dst_root(), "server-manager") def _gemini_skill_entry() -> str: return os.path.join(_gemini_skill_dst_dir(), "SKILL.md") def _agents_skill_dst_root() -> str: return os.path.join(_target_home(), ".agents", "skills") def _agents_skill_dst_dir() -> str: return os.path.join(_agents_skill_dst_root(), "server-manager") def _gemini_wrapper_dst() -> str: return os.path.join( _shared_dir(), "gemini-ssh.cmd" if sys.platform == "win32" else "gemini-ssh", ) def _ensure_executable(path: str): if sys.platform == "win32" or not os.path.exists(path): return mode = os.stat(path).st_mode os.chmod(path, mode | 0o755) def _copy_file(src: str, dst: str, executable: bool = False) -> str: os.makedirs(os.path.dirname(dst), exist_ok=True) shutil.copy2(src, dst) if executable: _ensure_executable(dst) return dst def _copy_tree(src: str, dst: str) -> str: os.makedirs(dst, exist_ok=True) shutil.copytree(src, dst, dirs_exist_ok=True) return dst def _install_wrapper(src: str, dst: str) -> str: return _copy_file(src, dst, executable=(sys.platform != "win32")) def _skill_script_names() -> list[str]: if sys.platform == "win32": return [ os.path.join("scripts", "server-manager-doctor.cmd"), os.path.join("scripts", "codex-ssh-wrapper.cmd"), os.path.join("scripts", "server-manager-gemini-doctor.cmd"), os.path.join("scripts", "gemini-ssh-wrapper.cmd"), ] return [ os.path.join("scripts", "server-manager-doctor.sh"), os.path.join("scripts", "codex-ssh-wrapper.sh"), os.path.join("scripts", "server-manager-gemini-doctor.sh"), os.path.join("scripts", "gemini-ssh-wrapper.sh"), ] def _ensure_skill_scripts(skill_dir: str): for rel_path in _skill_script_names(): _ensure_executable(os.path.join(skill_dir, rel_path)) def _iter_all_user_homes() -> list[str]: homes: list[str] = [] def add(path: str): expanded = os.path.abspath(os.path.expanduser(path)) if os.path.isdir(expanded) and expanded not in homes: homes.append(expanded) add(_target_home()) if sys.platform == "win32": users_root = os.path.join(os.environ.get("SystemDrive", "C:"), "Users") skip = {"public", "default", "default user", "all users"} if os.path.isdir(users_root): for name in sorted(os.listdir(users_root)): if name.lower() in skip: continue add(os.path.join(users_root, name)) elif sys.platform == "darwin": add("/var/root") users_root = "/Users" if os.path.isdir(users_root): for name in sorted(os.listdir(users_root)): if name.startswith("."): continue add(os.path.join(users_root, name)) else: add("/root") users_root = "/home" if os.path.isdir(users_root): for name in sorted(os.listdir(users_root)): if name.startswith("."): continue add(os.path.join(users_root, name)) return homes def check_status() -> dict: """Check what's installed and what's missing.""" shared_dir = _shared_dir() ssh_key_path = _ssh_key_path() return { "target_home": _target_home(), "shared_dir": os.path.exists(shared_dir), "servers_json": os.path.exists(os.path.join(shared_dir, "servers.json")), "ssh_script": os.path.exists(os.path.join(shared_dir, "ssh.py")), "encryption": os.path.exists(os.path.join(shared_dir, "encryption.py")), "claude_skill_installed": os.path.exists(_claude_skill_dst()), "codex_skill_installed": os.path.exists(_codex_skill_entry()), "codex_wrapper_installed": os.path.exists(_codex_wrapper_dst()), "gemini_skill_installed": os.path.exists(_gemini_skill_entry()), "gemini_wrapper_installed": os.path.exists(_gemini_wrapper_dst()), "ssh_key_exists": os.path.exists(ssh_key_path), "ssh_key_pub": os.path.exists(ssh_key_path + ".pub"), } def install_ssh_script() -> str: """Copy ssh.py and encryption.py to shared dir.""" shared_dir = _shared_dir() os.makedirs(shared_dir, exist_ok=True) results = [] dst = os.path.join(shared_dir, "ssh.py") if os.path.exists(SSH_SCRIPT_SRC): _copy_file(SSH_SCRIPT_SRC, dst, executable=True) log.info(f"ssh.py installed: {dst}") results.append(f"ssh.py installed: {dst}") elif os.path.exists(dst): results.append(f"ssh.py already exists: {dst}") else: results.append("ERROR: ssh.py source not found") enc_dst = os.path.join(shared_dir, "encryption.py") if os.path.exists(ENCRYPTION_SRC): _copy_file(ENCRYPTION_SRC, enc_dst) log.info(f"encryption.py installed: {enc_dst}") results.append(f"encryption.py installed: {enc_dst}") elif os.path.exists(enc_dst): results.append(f"encryption.py already exists: {enc_dst}") else: results.append("ERROR: encryption.py source not found") return "\n".join(results) def install_claude_skill() -> str: """Install /ssh skill for Claude Code.""" claude_skill_dst_dir = _claude_skill_dst_dir() claude_skill_dst = _claude_skill_dst() os.makedirs(claude_skill_dst_dir, exist_ok=True) if os.path.exists(CLAUDE_SKILL_SRC): _copy_file(CLAUDE_SKILL_SRC, claude_skill_dst) log.info(f"Claude skill installed: {claude_skill_dst}") return f"Claude skill installed: {claude_skill_dst}" if os.path.exists(claude_skill_dst): return f"Claude skill already exists: {claude_skill_dst}" skill_content = _generate_skill_content() with open(claude_skill_dst, "w", encoding="utf-8") as f: f.write(skill_content) log.info(f"Claude skill generated: {claude_skill_dst}") return f"Claude skill generated: {claude_skill_dst}" def install_codex_skill() -> str: """Install ServerManager skill package for Codex and the local wrapper.""" results = [] codex_skill_dst_dir = _codex_skill_dst_dir() codex_skill_entry = _codex_skill_entry() codex_wrapper_dst = _codex_wrapper_dst() if os.path.isdir(CODEX_SKILL_SRC_DIR): _copy_tree(CODEX_SKILL_SRC_DIR, codex_skill_dst_dir) _ensure_skill_scripts(codex_skill_dst_dir) log.info(f"Codex skill installed: {codex_skill_dst_dir}") results.append(f"Codex skill installed: {codex_skill_dst_dir}") elif os.path.exists(codex_skill_entry): results.append(f"Codex skill already exists: {codex_skill_dst_dir}") else: results.append("ERROR: Codex skill source not found") wrapper_src = CODEX_WRAPPER_SRC_CMD if sys.platform == "win32" else CODEX_WRAPPER_SRC_SH if os.path.exists(wrapper_src): _copy_file(wrapper_src, codex_wrapper_dst, executable=(sys.platform != "win32")) log.info(f"Codex wrapper installed: {codex_wrapper_dst}") results.append(f"Codex wrapper installed: {codex_wrapper_dst}") elif os.path.exists(codex_wrapper_dst): results.append(f"Codex wrapper already exists: {codex_wrapper_dst}") else: results.append("ERROR: Codex wrapper source not found") return "\n".join(results) def install_gemini_skill() -> str: """Install ServerManager skill package for Gemini.""" results = [] gemini_skill_dst_dir = _gemini_skill_dst_dir() gemini_skill_entry = _gemini_skill_entry() agents_skill_dst_dir = _agents_skill_dst_dir() gemini_wrapper_dst = _gemini_wrapper_dst() install_generic_mirror = os.environ.get( "SERVER_MANAGER_INSTALL_GENERIC_SKILL_MIRROR", "" ).strip() == "1" if os.path.isdir(GEMINI_SKILL_SRC_DIR): _copy_tree(GEMINI_SKILL_SRC_DIR, gemini_skill_dst_dir) _ensure_skill_scripts(gemini_skill_dst_dir) log.info(f"Gemini skill installed: {gemini_skill_dst_dir}") results.append(f"Gemini skill installed: {gemini_skill_dst_dir}") if install_generic_mirror: _copy_tree(GEMINI_SKILL_SRC_DIR, agents_skill_dst_dir) _ensure_skill_scripts(agents_skill_dst_dir) log.info(f"Generic agents skill mirror installed: {agents_skill_dst_dir}") results.append(f"Generic agents skill mirror installed: {agents_skill_dst_dir}") elif os.path.exists(agents_skill_dst_dir): shutil.rmtree(agents_skill_dst_dir, ignore_errors=True) log.info(f"Removed generic agents skill mirror to avoid Gemini conflicts: {agents_skill_dst_dir}") results.append( f"Removed generic agents skill mirror to avoid Gemini conflicts: {agents_skill_dst_dir}" ) elif os.path.exists(gemini_skill_entry): results.append(f"Gemini skill already exists: {gemini_skill_dst_dir}") else: results.append("ERROR: Gemini skill source not found") wrapper_src = GEMINI_WRAPPER_SRC_CMD if sys.platform == "win32" else GEMINI_WRAPPER_SRC_SH if not os.path.exists(wrapper_src): wrapper_src = CODEX_WRAPPER_SRC_CMD if sys.platform == "win32" else CODEX_WRAPPER_SRC_SH if os.path.exists(wrapper_src): _install_wrapper(wrapper_src, gemini_wrapper_dst) log.info(f"Gemini wrapper installed: {gemini_wrapper_dst}") results.append(f"Gemini wrapper installed: {gemini_wrapper_dst}") elif os.path.exists(gemini_wrapper_dst): results.append(f"Gemini wrapper already exists: {gemini_wrapper_dst}") else: results.append("ERROR: Gemini wrapper source not found") return "\n".join(results) def install_skill() -> str: """Backward-compatible alias for the Claude /ssh skill installer.""" return install_claude_skill() def generate_ssh_key() -> str: """Generate ed25519 SSH key if not exists.""" ssh_key_path = _ssh_key_path() if os.path.exists(ssh_key_path): return f"Key already exists: {ssh_key_path}" os.makedirs(os.path.dirname(ssh_key_path), exist_ok=True) try: subprocess.run( ["ssh-keygen", "-t", "ed25519", "-f", ssh_key_path, "-N", "", "-C", "server-manager"], check=True, capture_output=True, timeout=15 ) log.info(f"SSH key generated: {ssh_key_path}") return f"Key generated: {ssh_key_path}" except FileNotFoundError: hint = "enable OpenSSH optional feature" if sys.platform == "win32" else "install openssh-client" msg = f"ERROR: ssh-keygen not found — {hint}" log.error(msg) return msg except subprocess.CalledProcessError as e: msg = f"ERROR: ssh-keygen failed: {e.stderr.decode().strip()}" log.error(msg) return msg def install_global_claude_md() -> str: """Add/update server manager section in global ~/.claude/CLAUDE.md.""" global_claude_md = _global_claude_md() os.makedirs(os.path.dirname(global_claude_md), exist_ok=True) existing = "" if os.path.exists(global_claude_md): with open(global_claude_md, encoding="utf-8") as f: existing = f.read() pattern = re.compile( re.escape(_BLOCK_START) + r".*?" + re.escape(_BLOCK_END), re.DOTALL ) if pattern.search(existing): updated = pattern.sub(GLOBAL_CLAUDE_MD_BLOCK.strip(), existing) with open(global_claude_md, "w", encoding="utf-8") as f: f.write(updated) log.info(f"Global CLAUDE.md block updated: {global_claude_md}") return f"Global CLAUDE.md block updated: {global_claude_md}" with open(global_claude_md, "a", encoding="utf-8") as f: if existing and not existing.endswith("\n"): f.write("\n") f.write("\n" + GLOBAL_CLAUDE_MD_BLOCK) log.info(f"Global CLAUDE.md block added: {global_claude_md}") return f"Global CLAUDE.md block added: {global_claude_md}" def install_global_gemini_md() -> str: """Add/update server manager section in global ~/.gemini/GEMINI.md.""" global_gemini_md = _global_gemini_md() os.makedirs(os.path.dirname(global_gemini_md), exist_ok=True) existing = "" if os.path.exists(global_gemini_md): with open(global_gemini_md, encoding="utf-8") as f: existing = f.read() pattern = re.compile( re.escape(_GEMINI_BLOCK_START) + r".*?" + re.escape(_GEMINI_BLOCK_END), re.DOTALL ) if pattern.search(existing): updated = pattern.sub(GLOBAL_GEMINI_MD_BLOCK.strip(), existing) with open(global_gemini_md, "w", encoding="utf-8") as f: f.write(updated) log.info(f"Global GEMINI.md block updated: {global_gemini_md}") return f"Global GEMINI.md block updated: {global_gemini_md}" with open(global_gemini_md, "a", encoding="utf-8") as f: if existing and not existing.endswith("\n"): f.write("\n") f.write("\n" + GLOBAL_GEMINI_MD_BLOCK) log.info(f"Global GEMINI.md block added: {global_gemini_md}") return f"Global GEMINI.md block added: {global_gemini_md}" def install_all() -> list[str]: """Full setup — install everything for Claude Code, Codex, and Gemini.""" all_users = os.environ.get("SERVER_MANAGER_INSTALL_ALL_USERS", "").strip() == "1" base_steps = [ ("ssh_script", install_ssh_script), ("claude_skill", install_claude_skill), ("codex_skill", install_codex_skill), ("gemini_skill", install_gemini_skill), ("global_claude_md", install_global_claude_md), ("global_gemini_md", install_global_gemini_md), ] if not all_users: steps = base_steps[:3] + [("ssh_key", generate_ssh_key)] + base_steps[3:] results = [] for name, func in steps: try: log.info(f"install_all: running {name}") result = func() results.append(result) except Exception as e: msg = f"ERROR ({name}): {e}" log.error(msg) results.append(msg) return results results = [] original_target = os.environ.get("SERVER_MANAGER_TARGET_HOME") for home in _iter_all_user_homes(): os.environ["SERVER_MANAGER_TARGET_HOME"] = home results.append(f"[target_home] {home}") for name, func in base_steps: try: log.info(f"install_all(all_users): running {name} for {home}") result = func() results.append(result) except Exception as e: msg = f"ERROR ({name}, {home}): {e}" log.error(msg) results.append(msg) if original_target is None: os.environ.pop("SERVER_MANAGER_TARGET_HOME", None) else: os.environ["SERVER_MANAGER_TARGET_HOME"] = original_target results.append("INFO: SSH key generation skipped in SERVER_MANAGER_INSTALL_ALL_USERS=1 mode") return results def _generate_skill_content() -> str: """Generate /ssh skill markdown (fallback if skill-ssh.md not found).""" return """# Скилл /ssh — управление удалёнными серверами Ты управляешь удалёнными серверами через SSH-утилиту. ## ВАЖНО — Безопасность - **НИКОГДА не читай файлы** в директории `~/.server-connections/` напрямую - **НИКОГДА не читай** файлы `encryption.py`, `servers.json`, `settings.json` - **НИКОГДА не выводи пароли, IP-адреса, логины, порты** пользователю и в контекст нейронки - **НИКОГДА не используй** `--list-full` — он выводит IP/порты/логины в контекст AI - **Все операции только через** `python ~/.server-connections/ssh.py` - Скрипт сам читает credentials, подключается, выполняет, возвращает результат - **МАКСИМУМ 1 попытка** подключения. Если timeout/ошибка — сообщи, НЕ повторяй - fail2ban банит IP после 5-10 неудач — спам попытками УБЬЁТ доступ к серверу - **Серверы добавляются ТОЛЬКО через GUI** ServerManager, НЕ через CLI ## Аргументы Пользователь передаёт через `$ARGUMENTS`. Разбери и выполни. ## Команды ### Список серверов (безопасный — alias, тип, ключ, заметки) ```bash python ~/.server-connections/ssh.py --list ``` ### Информация о сервере (безопасная — без IP/логина/пароля/порта) ```bash python ~/.server-connections/ssh.py --info ALIAS ``` ### Статус всех серверов (alias + online/offline) ```bash python ~/.server-connections/ssh.py --status ``` ### Выполнить команду на сервере ```bash python ~/.server-connections/ssh.py ALIAS "command" ``` ### Выполнить команду БЕЗ sudo ```bash python ~/.server-connections/ssh.py ALIAS --no-sudo "command" ``` ### Загрузить файл на сервер ```bash python ~/.server-connections/ssh.py ALIAS --upload "local/file" //remote/path/file ``` **ВАЖНО (Windows/Git Bash):** remote path ОБЯЗАТЕЛЬНО с двойным слешем `//home/...`, `//tmp/...`. ### Скачать файл с сервера ```bash python ~/.server-connections/ssh.py ALIAS --download //remote/path/file "local/file" ``` ### Установить SSH-ключ на сервер ```bash python ~/.server-connections/ssh.py ALIAS --install-key ``` ### Проверить доступность сервера ```bash python ~/.server-connections/ssh.py ALIAS --ping ``` ### Обновить заметки сервера ```bash python ~/.server-connections/ssh.py --set-note ALIAS "описание сервера" ``` ### Удалить сервер ```bash python ~/.server-connections/ssh.py --remove ALIAS ``` **Спроси подтверждение у пользователя перед удалением!** ## Поведение - **Auto-sudo**: если user не root — команды автоматически оборачиваются в `sudo -S` - **--no-sudo**: для команд без root (например `ls`, `cat`) - **Timeout**: 120 секунд на команду, 15 секунд на подключение - **SSH-ключ**: пробуется первым, fallback на пароль - **Прогресс**: файлы >=1MB показывают 25/50/75%, итог с размером/временем/скоростью ## Правила - Отвечай на русском языке - Показывай результат каждой операции - При ошибках — объясняй причину и предлагай решение - Если timeout — предложи проверить VPN/firewall/панель хостера - Файлы создаваемые на сервере должны иметь права 664 (owner+group rw) - При вопросе о серверах — СНАЧАЛА `--list`, потом `--info ALIAS` если нужны детали """