""" Background status checker — parallel server pings for all connection types. """ import socket import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import TYPE_CHECKING if TYPE_CHECKING: from core.server_store import ServerStore from core.ssh_client import SSHClientWrapper from core.logger import log # Types that support native check_connection() _SSH_TYPE = {"ssh"} _SQL_TYPES = {"mariadb", "mssql", "postgresql"} _REDIS_TYPE = {"redis"} _HTTP_TYPES = {"grafana", "prometheus", "winrm"} _TCP_TYPES = {"telnet", "rdp", "vnc"} class StatusChecker: def __init__(self, store: "ServerStore"): self.store = store self._running = False self._thread: threading.Thread | None = None self._gui_callback = None def start(self): if self._running: return self._running = True self._thread = threading.Thread(target=self._loop, daemon=True) self._thread.start() def stop(self): self._running = False if self._thread: self._thread.join(timeout=3.0) def set_gui_callback(self, callback): self._gui_callback = callback def check_one(self, server: dict) -> bool: """Check a single server based on its type.""" server_type = server.get("type", "ssh") if server_type in _SSH_TYPE: return self._check_ssh(server) if server_type in _SQL_TYPES: return self._check_sql(server) if server_type in _REDIS_TYPE: return self._check_redis(server) if server_type == "grafana": return self._check_http(server, "/api/health") if server_type == "prometheus": return self._check_http(server, "/-/healthy") if server_type == "winrm": return self._check_http(server, "/wsman") if server_type in _TCP_TYPES: return self._check_tcp(server) return False def _check_ssh(self, server: dict) -> bool: key_path = self.store.get_ssh_key_path() wrapper = SSHClientWrapper(server, key_path) return wrapper.check_connection() def _check_tcp(self, server: dict) -> bool: """Check TCP connectivity (telnet, RDP, VNC).""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect((server["ip"], server.get("port", 23))) sock.close() return True except Exception: return False def _check_sql(self, server: dict) -> bool: """Check SQL connectivity via SELECT 1.""" try: from core.sql_client import SQLClient client = SQLClient(server) result = client.connect() if result: ok = client.check_connection() client.disconnect() return ok return False except Exception: return False def _check_redis(self, server: dict) -> bool: """Check Redis via PING.""" try: from core.redis_client import RedisClient client = RedisClient(server) result = client.connect() # connect() уже делает ping() client.disconnect() return result except Exception: return False def _check_http(self, server: dict, path: str) -> bool: """Check HTTP(S) endpoint.""" try: import requests use_ssl = server.get("use_ssl", False) scheme = "https" if use_ssl else "http" url = f"{scheme}://{server['ip']}:{server.get('port', 80)}{path}" headers = {} api_token = server.get("api_token", "") if api_token: headers["Authorization"] = f"Bearer {api_token}" resp = requests.get(url, headers=headers, timeout=5, verify=False) return resp.status_code < 500 except Exception: return False def check_all_now(self): threading.Thread(target=self._check_cycle, daemon=True).start() def _loop(self): while self._running: self._check_cycle() interval = self.store.get_check_interval() for _ in range(interval * 10): if not self._running: return time.sleep(0.1) def _check_cycle(self): servers = self.store.get_all() # Mark skipped servers as disabled for s in servers: if s.get("skip_check", False): self.store.set_status(s["alias"], "disabled") checkable = [s for s in servers if not s.get("skip_check", False)] if not checkable: return # Parallel checks — up to 10 concurrent max_workers = min(10, len(checkable)) try: with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit(self.check_one, s): s["alias"] for s in checkable } for future in as_completed(futures, timeout=30): if not self._running: return alias = futures[future] try: online = future.result(timeout=10) self.store.set_status(alias, "online" if online else "offline") except Exception: self.store.set_status(alias, "offline") except Exception as e: log.warning(f"Status check cycle error: {e}") if self._gui_callback: try: self._gui_callback() except Exception: pass