185 lines
5.8 KiB
Python
185 lines
5.8 KiB
Python
"""
|
|
Redis client wrapper — duck-typed, lazy-imports redis module.
|
|
"""
|
|
|
|
from core.logger import log
|
|
|
|
_redis = None
|
|
|
|
|
|
def _get_redis():
|
|
global _redis
|
|
if _redis is None:
|
|
import redis as _r
|
|
_redis = _r
|
|
return _redis
|
|
|
|
|
|
class RedisClient:
|
|
"""Manage a single Redis connection. No ABC — duck typing."""
|
|
|
|
def __init__(self, server: dict):
|
|
self._host = server["ip"]
|
|
self._port = int(server.get("port", 6379))
|
|
self._password = server.get("password") or None
|
|
self._db = int(server.get("db_index", 0))
|
|
self._ssl = server.get("ssl", False) or server.get("use_ssl", False)
|
|
self._conn = None
|
|
|
|
# -- lifecycle --------------------------------------------------------
|
|
|
|
def connect(self) -> bool:
|
|
try:
|
|
r = _get_redis()
|
|
self._conn = r.Redis(
|
|
host=self._host,
|
|
port=self._port,
|
|
password=self._password,
|
|
db=self._db,
|
|
decode_responses=True,
|
|
socket_timeout=5,
|
|
socket_connect_timeout=5,
|
|
ssl=self._ssl,
|
|
)
|
|
self._conn.ping()
|
|
log.info("Redis connected %s:%s db=%s", self._host, self._port, self._db)
|
|
return True
|
|
except Exception as exc:
|
|
log.error("Redis connect failed: %s", exc)
|
|
self._conn = None
|
|
return False
|
|
|
|
def disconnect(self):
|
|
if self._conn is not None:
|
|
try:
|
|
self._conn.close()
|
|
except Exception:
|
|
pass
|
|
self._conn = None
|
|
log.info("Redis disconnected")
|
|
|
|
def check_connection(self) -> bool:
|
|
try:
|
|
return self._conn is not None and self._conn.ping()
|
|
except Exception:
|
|
return False
|
|
|
|
def select_db(self, db: int):
|
|
"""Switch to a different Redis database index."""
|
|
db = int(db)
|
|
if self._conn is not None and db != self._db:
|
|
self._conn.execute_command("SELECT", db)
|
|
self._db = db
|
|
|
|
# -- commands ---------------------------------------------------------
|
|
|
|
def execute(self, command: str) -> str:
|
|
"""Parse a raw command string, execute via redis-py, return formatted."""
|
|
if not self._conn:
|
|
return "[not connected]"
|
|
import shlex
|
|
try:
|
|
parts = shlex.split(command)
|
|
except ValueError:
|
|
parts = command.split() # fallback для незакрытых кавычек
|
|
if not parts:
|
|
return ""
|
|
try:
|
|
result = self._conn.execute_command(*parts)
|
|
return self._format(result)
|
|
except Exception as exc:
|
|
return f"(error) {exc}"
|
|
|
|
def info(self, section=None) -> dict:
|
|
if not self._conn:
|
|
return {}
|
|
try:
|
|
return self._conn.info(section) if section else self._conn.info()
|
|
except Exception as exc:
|
|
log.error("Redis INFO failed: %s", exc)
|
|
return {}
|
|
|
|
def dbsize(self) -> int:
|
|
if not self._conn:
|
|
return 0
|
|
try:
|
|
return self._conn.dbsize()
|
|
except Exception as exc:
|
|
log.error("Redis DBSIZE failed: %s", exc)
|
|
return 0
|
|
|
|
def keys(self, pattern: str = "*", count: int = 100) -> list[str]:
|
|
"""Return up to *count* keys matching *pattern* via SCAN."""
|
|
if not self._conn:
|
|
return []
|
|
result = []
|
|
try:
|
|
cursor = 0
|
|
while len(result) < count:
|
|
cursor, batch = self._conn.scan(cursor, match=pattern, count=count)
|
|
result.extend(batch)
|
|
if cursor == 0:
|
|
break
|
|
return result[:count]
|
|
except Exception as exc:
|
|
log.error("Redis SCAN failed: %s", exc)
|
|
return []
|
|
|
|
def get_type(self, key: str) -> str:
|
|
if not self._conn:
|
|
return "none"
|
|
try:
|
|
return self._conn.type(key)
|
|
except Exception:
|
|
return "none"
|
|
|
|
def get_ttl(self, key: str) -> int:
|
|
"""Return TTL in seconds (-1 no expiry, -2 key missing)."""
|
|
if not self._conn:
|
|
return -2
|
|
try:
|
|
return self._conn.ttl(key)
|
|
except Exception:
|
|
return -2
|
|
|
|
def get_value(self, key: str) -> str:
|
|
"""Auto-detect type and return a human-readable string."""
|
|
if not self._conn:
|
|
return "(not connected)"
|
|
try:
|
|
t = self.get_type(key)
|
|
if t == "string":
|
|
return self._conn.get(key) or ""
|
|
if t == "list":
|
|
items = self._conn.lrange(key, 0, 99)
|
|
return "\n".join(f"{i}) {v}" for i, v in enumerate(items))
|
|
if t == "set":
|
|
items = list(self._conn.sscan_iter(key, count=100))[:100]
|
|
return "\n".join(items)
|
|
if t == "hash":
|
|
data = self._conn.hgetall(key)
|
|
return "\n".join(f"{k} -> {v}" for k, v in data.items())
|
|
if t == "zset":
|
|
items = self._conn.zrange(key, 0, 99, withscores=True)
|
|
return "\n".join(f"{v} (score={s})" for v, s in items)
|
|
return f"(unknown type: {t})"
|
|
except Exception as exc:
|
|
return f"(error) {exc}"
|
|
|
|
# -- helpers ----------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _format(value) -> str:
|
|
if value is None:
|
|
return "(nil)"
|
|
if isinstance(value, bool):
|
|
return "OK" if value else "(error)"
|
|
if isinstance(value, int):
|
|
return f"(integer) {value}"
|
|
if isinstance(value, (list, tuple)):
|
|
if not value:
|
|
return "(empty list)"
|
|
lines = [f"{i + 1}) {RedisClient._format(v)}" for i, v in enumerate(value)]
|
|
return "\n".join(lines)
|
|
return str(value)
|