Files
server-manager/core/server_store.py
chrome-storm-c442 42a6a876d3 Add Claude Code integration: shared config + Setup tab
- Shared servers.json at ~/.server-connections/ (GUI + Claude Code)
- Setup tab: one-click install of ssh.py, /ssh skill, SSH key
- Duplicate checks — safe to run multiple times
- tools/ssh.py + tools/skill-ssh.md bundled
- Updated README with integration docs (EN/RU/ZH)
- Deploy guide for new machines

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 09:01:22 -05:00

103 lines
3.3 KiB
Python

"""
Server store — CRUD + JSON persistence + observer pattern.
"""
import json
import os
import shutil
from typing import Callable, Optional
# Shared config — same file used by ssh.py and Claude Code /ssh skill
SHARED_DIR = os.path.expanduser("~/.server-connections")
SERVERS_FILE = os.path.join(SHARED_DIR, "servers.json")
# Fallback: local config dir (for example file)
LOCAL_CONFIG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config")
EXAMPLE_FILE = os.path.join(LOCAL_CONFIG_DIR, "servers.example.json")
SERVER_TYPES = ["ssh", "telnet", "rdp", "mariadb", "mssql", "postgresql"]
DEFAULT_PORTS = {
"ssh": 22,
"telnet": 23,
"rdp": 3389,
"mariadb": 3306,
"mssql": 1433,
"postgresql": 5432,
}
class ServerStore:
def __init__(self):
self._data: dict = {"servers": [], "ssh_key": {"type": "ed25519", "path": "~/.ssh/id_ed25519"}}
self._observers: list[Callable] = []
self._statuses: dict[str, str] = {} # alias -> "online" | "offline" | "unknown"
self._load()
def _load(self):
if os.path.exists(SERVERS_FILE):
with open(SERVERS_FILE, "r", encoding="utf-8") as f:
self._data = json.load(f)
elif os.path.exists(EXAMPLE_FILE):
with open(EXAMPLE_FILE, "r", encoding="utf-8") as f:
self._data = json.load(f)
self._save()
def _save(self):
os.makedirs(SHARED_DIR, exist_ok=True)
with open(SERVERS_FILE, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2, ensure_ascii=False)
def _notify(self):
for cb in self._observers:
try:
cb()
except Exception:
pass
def subscribe(self, callback: Callable):
self._observers.append(callback)
def get_all(self) -> list[dict]:
return list(self._data.get("servers", []))
def get_server(self, alias: str) -> Optional[dict]:
for s in self._data.get("servers", []):
if s["alias"] == alias:
return dict(s)
return None
def add_server(self, server: dict):
if self.get_server(server["alias"]):
raise ValueError(f"Server '{server['alias']}' already exists")
self._data.setdefault("servers", []).append(server)
self._save()
self._notify()
def update_server(self, alias: str, updated: dict):
servers = self._data.get("servers", [])
for i, s in enumerate(servers):
if s["alias"] == alias:
servers[i] = updated
self._save()
self._notify()
return
raise ValueError(f"Server '{alias}' not found")
def remove_server(self, alias: str):
self._data["servers"] = [s for s in self._data.get("servers", []) if s["alias"] != alias]
self._statuses.pop(alias, None)
self._save()
self._notify()
def get_ssh_key_path(self) -> str:
path = self._data.get("ssh_key", {}).get("path", "~/.ssh/id_ed25519")
return os.path.expanduser(path)
# Status management
def set_status(self, alias: str, status: str):
self._statuses[alias] = status
def get_status(self, alias: str) -> str:
return self._statuses.get(alias, "unknown")