Files
server-manager/core/sql_client.py
chrome-storm-c442 6f0bfe39f1 v1.8.56: Database Tree Explorer + thread-safe SQL operations
- Add HeidiSQL-style database tree panel (Databases → Tables → Columns)
- Lazy loading with ttk.Treeview, context menus, double-click SELECT TOP 1000
- Fix pymysql thread safety: serialize all DB ops with threading.Lock
- Use lock.acquire(timeout=10) to prevent deadlocks between tree and query threads
- Always reset _executing flag in finally block to prevent stuck queries
- Add _ensure_connected() auto-reconnect on broken connections
- Add sql_client check_connection() null safety
- Add 12 tree-related i18n keys (EN/RU/ZH)
- Clean up old releases

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 04:18:45 -05:00

200 lines
8.2 KiB
Python

"""
SQL client — connect and query MariaDB/MySQL, PostgreSQL, MSSQL.
Drivers are imported lazily so the module loads even if a driver is missing.
"""
import time
from core.logger import log
class SQLClient:
"""Unified SQL client for MariaDB/MySQL, PostgreSQL, and MSSQL."""
DRIVERS = {"mariadb": "pymysql", "mysql": "pymysql", "postgresql": "psycopg2", "mssql": "pymssql"}
def __init__(self, server: dict):
self._type = server["type"].lower()
self._ip = server["ip"]
self._port = int(server.get("port", self._default_port()))
self._user = server["user"]
self._password = server["password"]
self._database = server.get("database", "")
self._conn = None
def _default_port(self) -> int:
return {"mariadb": 3306, "mysql": 3306, "postgresql": 5432, "mssql": 1433}.get(self._type, 3306)
# ── connection ──────────────────────────────────────────────
def connect(self) -> bool:
try:
if self._type in ("mariadb", "mysql"):
import pymysql
self._conn = pymysql.connect(
host=self._ip, port=self._port, user=self._user,
password=self._password, database=self._database or None,
charset="utf8mb4", connect_timeout=10, autocommit=True,
)
elif self._type == "postgresql":
import psycopg2
self._conn = psycopg2.connect(
host=self._ip, port=self._port, user=self._user,
password=self._password, dbname=self._database or "postgres",
connect_timeout=10,
)
self._conn.autocommit = True
elif self._type == "mssql":
import pymssql
self._conn = pymssql.connect(
server=self._ip, port=self._port, user=self._user,
password=self._password, database=self._database or "master",
login_timeout=10, charset="UTF-8",
)
else:
log.error("sql_client: unsupported type %s", self._type)
return False
log.info("sql_client: connected to %s (%s)", self._type, self._ip)
return True
except Exception as exc:
log.error("sql_client: connect failed — %s", exc)
return False
def disconnect(self):
if self._conn:
try:
self._conn.close()
except Exception:
pass
self._conn = None
log.info("sql_client: disconnected")
def check_connection(self) -> bool:
if self._conn is None:
return False
try:
cur = self._conn.cursor()
cur.execute("SELECT 1")
cur.fetchone()
cur.close()
return True
except Exception:
return False
# ── query execution ─────────────────────────────────────────
def execute_query(self, sql: str, params=None) -> dict:
"""Execute SQL and return {columns, rows, rowcount, elapsed}."""
t0 = time.perf_counter()
try:
cur = self._conn.cursor()
cur.execute(sql, params)
elapsed = time.perf_counter() - t0
if cur.description:
columns = [col[0] for col in cur.description]
rows = cur.fetchall()
else:
columns, rows = [], []
result = {
"columns": columns,
"rows": list(rows),
"rowcount": cur.rowcount,
"elapsed": round(elapsed, 4),
}
cur.close()
return result
except Exception as exc:
elapsed = time.perf_counter() - t0
log.error("sql_client: query failed (%.3fs) — %s", elapsed, exc)
raise
# ── introspection ───────────────────────────────────────────
def list_databases(self) -> list:
sql = {
"mariadb": "SHOW DATABASES",
"mysql": "SHOW DATABASES",
"postgresql": "SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname",
"mssql": "SELECT name FROM sys.databases ORDER BY name",
}[self._type]
rows = self.execute_query(sql)["rows"]
return [r[0] for r in rows]
def list_tables(self, database: str = None) -> list:
if database:
self.switch_database(database)
if self._type in ("mariadb", "mysql"):
sql = "SHOW TABLES"
elif self._type == "postgresql":
sql = ("SELECT tablename FROM pg_tables "
"WHERE schemaname = 'public' ORDER BY tablename")
else:
sql = ("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
"WHERE TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_NAME")
rows = self.execute_query(sql)["rows"]
return [r[0] for r in rows]
def describe_table(self, table: str) -> list:
if self._type in ("mariadb", "mysql"):
rows = self.execute_query("SHOW COLUMNS FROM `%s`" % table)["rows"]
return [{"name": r[0], "type": r[1], "nullable": r[2] == "YES",
"key": r[3] or "", "default": r[4]} for r in rows]
elif self._type == "postgresql":
sql = (
"SELECT c.column_name, c.data_type, c.is_nullable, "
"COALESCE(tc.constraint_type, ''), c.column_default "
"FROM information_schema.columns c "
"LEFT JOIN information_schema.key_column_usage kcu "
" ON c.table_name = kcu.table_name AND c.column_name = kcu.column_name "
"LEFT JOIN information_schema.table_constraints tc "
" ON kcu.constraint_name = tc.constraint_name "
"WHERE c.table_name = %s AND c.table_schema = 'public' "
"ORDER BY c.ordinal_position"
)
rows = self.execute_query(sql, (table,))["rows"]
return [{"name": r[0], "type": r[1], "nullable": r[2] == "YES",
"key": r[3], "default": r[4]} for r in rows]
else: # mssql
sql = (
"SELECT c.COLUMN_NAME, c.DATA_TYPE, c.IS_NULLABLE, "
"ISNULL(tc.CONSTRAINT_TYPE, ''), c.COLUMN_DEFAULT "
"FROM INFORMATION_SCHEMA.COLUMNS c "
"LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu "
" ON c.TABLE_NAME = kcu.TABLE_NAME AND c.COLUMN_NAME = kcu.COLUMN_NAME "
"LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc "
" ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME "
"WHERE c.TABLE_NAME = %s ORDER BY c.ORDINAL_POSITION"
)
rows = self.execute_query(sql, (table,))["rows"]
return [{"name": r[0], "type": r[1], "nullable": r[2] == "YES",
"key": r[3], "default": r[4]} for r in rows]
def current_database(self) -> str:
sql = {
"mariadb": "SELECT DATABASE()",
"mysql": "SELECT DATABASE()",
"postgresql": "SELECT current_database()",
"mssql": "SELECT DB_NAME()",
}[self._type]
rows = self.execute_query(sql)["rows"]
return rows[0][0] if rows else ""
def switch_database(self, db: str):
if self._type in ("mariadb", "mysql"):
self._conn.select_db(db)
elif self._type == "postgresql":
self.disconnect()
self._database = db
self.connect()
elif self._type == "mssql":
self.execute_query("USE %s" % db)
self._database = db
log.info("sql_client: switched to database %s", db)
def server_version(self) -> str:
sql = "SELECT VERSION()" if self._type != "mssql" else "SELECT @@VERSION"
rows = self.execute_query(sql)["rows"]
return rows[0][0] if rows else "unknown"