""" Redis client wrapper — duck-typed, lazy-imports redis module. """ from core.logger import log _redis = None def _get_redis(): global _redis if _redis is None: import redis as _r _redis = _r return _redis class RedisClient: """Manage a single Redis connection. No ABC — duck typing.""" def __init__(self, server: dict): self._host = server["ip"] self._port = int(server.get("port", 6379)) self._password = server.get("password") or None self._db = int(server.get("db_index", 0)) self._conn = None # -- lifecycle -------------------------------------------------------- def connect(self) -> bool: try: r = _get_redis() self._conn = r.Redis( host=self._host, port=self._port, password=self._password, db=self._db, decode_responses=True, socket_timeout=5, socket_connect_timeout=5, ) self._conn.ping() log.info("Redis connected %s:%s db=%s", self._host, self._port, self._db) return True except Exception as exc: log.error("Redis connect failed: %s", exc) self._conn = None return False def disconnect(self): if self._conn is not None: try: self._conn.close() except Exception: pass self._conn = None log.info("Redis disconnected") def check_connection(self) -> bool: try: return self._conn is not None and self._conn.ping() except Exception: return False # -- commands --------------------------------------------------------- def execute(self, command: str) -> str: """Parse a raw command string, execute via redis-py, return formatted.""" if not self._conn: return "[not connected]" parts = command.split() if not parts: return "" try: result = self._conn.execute_command(*parts) return self._format(result) except Exception as exc: return f"(error) {exc}" def info(self, section=None) -> dict: if not self._conn: return {} try: return self._conn.info(section) if section else self._conn.info() except Exception as exc: log.error("Redis INFO failed: %s", exc) return {} def dbsize(self) -> int: if not self._conn: return 0 try: return self._conn.dbsize() except Exception as exc: log.error("Redis DBSIZE failed: %s", exc) return 0 def keys(self, pattern: str = "*", count: int = 100) -> list[str]: """Return up to *count* keys matching *pattern* via SCAN.""" if not self._conn: return [] result = [] try: cursor = 0 while len(result) < count: cursor, batch = self._conn.scan(cursor, match=pattern, count=count) result.extend(batch) if cursor == 0: break return result[:count] except Exception as exc: log.error("Redis SCAN failed: %s", exc) return [] def get_type(self, key: str) -> str: if not self._conn: return "none" try: return self._conn.type(key) except Exception: return "none" def get_ttl(self, key: str) -> int: """Return TTL in seconds (-1 no expiry, -2 key missing).""" if not self._conn: return -2 try: return self._conn.ttl(key) except Exception: return -2 def get_value(self, key: str) -> str: """Auto-detect type and return a human-readable string.""" if not self._conn: return "(not connected)" try: t = self.get_type(key) if t == "string": return self._conn.get(key) or "" if t == "list": items = self._conn.lrange(key, 0, 99) return "\n".join(f"{i}) {v}" for i, v in enumerate(items)) if t == "set": items = list(self._conn.sscan_iter(key, count=100))[:100] return "\n".join(items) if t == "hash": data = self._conn.hgetall(key) return "\n".join(f"{k} -> {v}" for k, v in data.items()) if t == "zset": items = self._conn.zrange(key, 0, 99, withscores=True) return "\n".join(f"{v} (score={s})" for v, s in items) return f"(unknown type: {t})" except Exception as exc: return f"(error) {exc}" # -- helpers ---------------------------------------------------------- @staticmethod def _format(value) -> str: if value is None: return "(nil)" if isinstance(value, bool): return "OK" if value else "(error)" if isinstance(value, int): return f"(integer) {value}" if isinstance(value, (list, tuple)): if not value: return "(empty list)" lines = [f"{i + 1}) {RedisClient._format(v)}" for i, v in enumerate(value)] return "\n".join(lines) return str(value)