""" SQL client — connect and query MariaDB/MySQL, PostgreSQL, MSSQL. Drivers are imported lazily so the module loads even if a driver is missing. """ import time from core.logger import log class SQLClient: """Unified SQL client for MariaDB/MySQL, PostgreSQL, and MSSQL.""" DRIVERS = {"mariadb": "pymysql", "mysql": "pymysql", "postgresql": "psycopg2", "mssql": "pymssql"} def __init__(self, server: dict): self._type = server["type"].lower() self._ip = server["ip"] self._port = int(server.get("port", self._default_port())) self._user = server["user"] self._password = server["password"] self._database = server.get("database", "") self._conn = None def _default_port(self) -> int: return {"mariadb": 3306, "mysql": 3306, "postgresql": 5432, "mssql": 1433}.get(self._type, 3306) # ── connection ────────────────────────────────────────────── def connect(self) -> bool: try: if self._type in ("mariadb", "mysql"): import pymysql self._conn = pymysql.connect( host=self._ip, port=self._port, user=self._user, password=self._password, database=self._database or None, charset="utf8mb4", connect_timeout=10, autocommit=True, ) elif self._type == "postgresql": import psycopg2 self._conn = psycopg2.connect( host=self._ip, port=self._port, user=self._user, password=self._password, dbname=self._database or "postgres", connect_timeout=10, ) self._conn.autocommit = True elif self._type == "mssql": import pymssql self._conn = pymssql.connect( server=self._ip, port=self._port, user=self._user, password=self._password, database=self._database or "master", login_timeout=10, charset="UTF-8", ) else: log.error("sql_client: unsupported type %s", self._type) return False log.info("sql_client: connected to %s (%s)", self._type, self._ip) return True except Exception as exc: log.error("sql_client: connect failed — %s", exc) return False def disconnect(self): if self._conn: try: self._conn.close() except Exception: pass self._conn = None log.info("sql_client: disconnected") def check_connection(self) -> bool: if self._conn is None: return False try: cur = self._conn.cursor() cur.execute("SELECT 1") cur.fetchone() cur.close() return True except Exception: return False # ── query execution ───────────────────────────────────────── def execute_query(self, sql: str, params=None) -> dict: """Execute SQL and return {columns, rows, rowcount, elapsed}.""" t0 = time.perf_counter() try: cur = self._conn.cursor() cur.execute(sql, params) elapsed = time.perf_counter() - t0 if cur.description: columns = [col[0] for col in cur.description] rows = cur.fetchall() else: columns, rows = [], [] result = { "columns": columns, "rows": list(rows), "rowcount": cur.rowcount, "elapsed": round(elapsed, 4), } cur.close() return result except Exception as exc: elapsed = time.perf_counter() - t0 log.error("sql_client: query failed (%.3fs) — %s", elapsed, exc) raise # ── introspection ─────────────────────────────────────────── def list_databases(self) -> list: sql = { "mariadb": "SHOW DATABASES", "mysql": "SHOW DATABASES", "postgresql": "SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname", "mssql": "SELECT name FROM sys.databases ORDER BY name", }[self._type] rows = self.execute_query(sql)["rows"] return [r[0] for r in rows] def list_tables(self, database: str = None) -> list: if database: self.switch_database(database) if self._type in ("mariadb", "mysql"): sql = "SHOW TABLES" elif self._type == "postgresql": sql = ("SELECT tablename FROM pg_tables " "WHERE schemaname = 'public' ORDER BY tablename") else: sql = ("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_NAME") rows = self.execute_query(sql)["rows"] return [r[0] for r in rows] def describe_table(self, table: str) -> list: if self._type in ("mariadb", "mysql"): rows = self.execute_query("SHOW COLUMNS FROM `%s`" % table)["rows"] return [{"name": r[0], "type": r[1], "nullable": r[2] == "YES", "key": r[3] or "", "default": r[4]} for r in rows] elif self._type == "postgresql": sql = ( "SELECT c.column_name, c.data_type, c.is_nullable, " "COALESCE(tc.constraint_type, ''), c.column_default " "FROM information_schema.columns c " "LEFT JOIN information_schema.key_column_usage kcu " " ON c.table_name = kcu.table_name AND c.column_name = kcu.column_name " "LEFT JOIN information_schema.table_constraints tc " " ON kcu.constraint_name = tc.constraint_name " "WHERE c.table_name = %s AND c.table_schema = 'public' " "ORDER BY c.ordinal_position" ) rows = self.execute_query(sql, (table,))["rows"] return [{"name": r[0], "type": r[1], "nullable": r[2] == "YES", "key": r[3], "default": r[4]} for r in rows] else: # mssql sql = ( "SELECT c.COLUMN_NAME, c.DATA_TYPE, c.IS_NULLABLE, " "ISNULL(tc.CONSTRAINT_TYPE, ''), c.COLUMN_DEFAULT " "FROM INFORMATION_SCHEMA.COLUMNS c " "LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu " " ON c.TABLE_NAME = kcu.TABLE_NAME AND c.COLUMN_NAME = kcu.COLUMN_NAME " "LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc " " ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME " "WHERE c.TABLE_NAME = %s ORDER BY c.ORDINAL_POSITION" ) rows = self.execute_query(sql, (table,))["rows"] return [{"name": r[0], "type": r[1], "nullable": r[2] == "YES", "key": r[3], "default": r[4]} for r in rows] def current_database(self) -> str: sql = { "mariadb": "SELECT DATABASE()", "mysql": "SELECT DATABASE()", "postgresql": "SELECT current_database()", "mssql": "SELECT DB_NAME()", }[self._type] rows = self.execute_query(sql)["rows"] return rows[0][0] if rows else "" def switch_database(self, db: str): if self._type in ("mariadb", "mysql"): self._conn.select_db(db) elif self._type == "postgresql": self.disconnect() self._database = db self.connect() elif self._type == "mssql": self.execute_query("USE %s" % db) self._database = db log.info("sql_client: switched to database %s", db) def server_version(self) -> str: sql = "SELECT VERSION()" if self._type != "mssql" else "SELECT @@VERSION" rows = self.execute_query(sql)["rows"] return rows[0][0] if rows else "unknown"