""" WinRM client — execute PowerShell and CMD commands on remote Windows machines. """ from core.logger import log class WinRMClient: """Remote Windows management via WinRM (pywinrm).""" def __init__(self, server: dict): self.server = server self._session = None def connect(self) -> bool: """Create WinRM session and verify connectivity.""" try: import winrm except ImportError: log.error("pywinrm not installed. Run: pip install pywinrm") raise ImportError("pywinrm is required for WinRM connections") hostname = self.server["ip"] port = self.server.get("port", 5986 if self.server.get("use_ssl", True) else 5985) user = self.server.get("user", "Administrator") password = self.server.get("password", "") use_ssl = self.server.get("use_ssl", True) transport = "ssl" if use_ssl else "ntlm" scheme = "https" if use_ssl else "http" endpoint = f"{scheme}://{hostname}:{port}/wsman" log.info(f"WinRM connecting to {self.server.get('alias', '?')} via {transport}") self._session = winrm.Session( target=endpoint, auth=(user, password), transport=transport, server_cert_validation="ignore", ) # Verify connection with a simple command try: result = self._session.run_cmd("hostname") if result.status_code == 0: host = result.std_out.decode("utf-8", errors="replace").strip() log.info(f"WinRM connected to {host}") return True else: log.warning(f"WinRM connection test returned exit code {result.status_code}") return False except Exception as e: log.error(f"WinRM connection test failed: {e}") self._session = None raise def disconnect(self): """Close WinRM session.""" self._session = None log.debug("WinRM session cleared") def check_connection(self) -> bool: """Check if WinRM session is alive.""" if self._session is None: return False try: result = self._session.run_cmd("echo ok") return result.status_code == 0 except Exception: return False def _ensure_session(self): """Raise if not connected.""" if self._session is None: raise ConnectionError("WinRM session not established. Call connect() first.") def exec_ps(self, script: str) -> tuple[str, str, int]: """Execute a PowerShell script on the remote host. Returns: (stdout, stderr, exit_code) """ self._ensure_session() log.debug(f"WinRM exec_ps: {script[:120]}...") try: result = self._session.run_ps(script) stdout = result.std_out.decode("utf-8", errors="replace") stderr = result.std_err.decode("utf-8", errors="replace") exit_code = result.status_code log.debug(f"WinRM exec_ps exit_code={exit_code}") return stdout, stderr, exit_code except Exception as e: log.error(f"WinRM exec_ps failed: {e}") raise def exec_cmd(self, command: str) -> tuple[str, str, int]: """Execute a CMD command on the remote host. Returns: (stdout, stderr, exit_code) """ self._ensure_session() log.debug(f"WinRM exec_cmd: {command[:120]}...") try: result = self._session.run_cmd(command) stdout = result.std_out.decode("utf-8", errors="replace") stderr = result.std_err.decode("utf-8", errors="replace") exit_code = result.status_code log.debug(f"WinRM exec_cmd exit_code={exit_code}") return stdout, stderr, exit_code except Exception as e: log.error(f"WinRM exec_cmd failed: {e}") raise