Files
server-manager/core/status_checker.py
2026-02-28 07:15:46 -05:00

175 lines
5.7 KiB
Python

"""
Background status checker — parallel server pings for all connection types.
"""
import socket
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.server_store import ServerStore
from core.ssh_client import SSHClientWrapper
from core.logger import log
# Types that support native check_connection()
_SSH_TYPE = {"ssh"}
_SQL_TYPES = {"mariadb", "mssql", "postgresql"}
_REDIS_TYPE = {"redis"}
_HTTP_TYPES = {"grafana", "prometheus", "winrm"}
_TCP_TYPES = {"telnet", "rdp", "vnc"}
class StatusChecker:
def __init__(self, store: "ServerStore"):
self.store = store
self._running = False
self._thread: threading.Thread | None = None
self._gui_callback = None
def start(self):
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=3.0)
def set_gui_callback(self, callback):
self._gui_callback = callback
def check_one(self, server: dict) -> bool:
"""Check a single server based on its type."""
server_type = server.get("type", "ssh")
if server_type in _SSH_TYPE:
return self._check_ssh(server)
if server_type in _SQL_TYPES:
return self._check_sql(server)
if server_type in _REDIS_TYPE:
return self._check_redis(server)
if server_type == "grafana":
return self._check_http(server, "/api/health")
if server_type == "prometheus":
return self._check_http(server, "/-/healthy")
if server_type == "winrm":
return self._check_http(server, "/wsman")
if server_type in _TCP_TYPES:
return self._check_tcp(server)
return False
def _check_ssh(self, server: dict) -> bool:
key_path = self.store.get_ssh_key_path()
wrapper = SSHClientWrapper(server, key_path)
return wrapper.check_connection()
def _check_tcp(self, server: dict) -> bool:
"""Check TCP connectivity (telnet, RDP, VNC)."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((server["ip"], server.get("port", 23)))
sock.close()
return True
except Exception:
return False
def _check_sql(self, server: dict) -> bool:
"""Check SQL connectivity via SELECT 1."""
try:
from core.sql_client import SQLClient
client = SQLClient(server)
result = client.connect()
if result:
ok = client.check_connection()
client.disconnect()
return ok
return False
except Exception:
return False
def _check_redis(self, server: dict) -> bool:
"""Check Redis via PING."""
try:
from core.redis_client import RedisClient
client = RedisClient(server)
result = client.connect() # connect() уже делает ping()
client.disconnect()
return result
except Exception:
return False
def _check_http(self, server: dict, path: str) -> bool:
"""Check HTTP(S) endpoint."""
try:
import requests
use_ssl = server.get("use_ssl", False)
scheme = "https" if use_ssl else "http"
url = f"{scheme}://{server['ip']}:{server.get('port', 80)}{path}"
headers = {}
api_token = server.get("api_token", "")
if api_token:
headers["Authorization"] = f"Bearer {api_token}"
resp = requests.get(url, headers=headers, timeout=5, verify=False)
return resp.status_code < 500
except Exception:
return False
def check_all_now(self):
threading.Thread(target=self._check_cycle, daemon=True).start()
def _loop(self):
while self._running:
self._check_cycle()
interval = self.store.get_check_interval()
for _ in range(interval * 10):
if not self._running:
return
time.sleep(0.1)
def _check_cycle(self):
servers = self.store.get_all()
# Mark skipped servers as disabled
for s in servers:
if s.get("skip_check", False):
self.store.set_status(s["alias"], "disabled")
checkable = [s for s in servers if not s.get("skip_check", False)]
if not checkable:
return
# Parallel checks — up to 10 concurrent
max_workers = min(10, len(checkable))
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.check_one, s): s["alias"]
for s in checkable
}
for future in as_completed(futures, timeout=30):
if not self._running:
return
alias = futures[future]
try:
online = future.result(timeout=10)
self.store.set_status(alias, "online" if online else "offline")
except Exception:
self.store.set_status(alias, "offline")
except Exception as e:
log.warning(f"Status check cycle error: {e}")
if self._gui_callback:
try:
self._gui_callback()
except Exception:
pass