""" Remote desktop launchers — RDP and VNC. RemoteDesktopLauncher: external client launch (VNC, non-Windows RDP). EmbeddedRDP: embed mstsc.exe inside a tkinter frame via SetParent (Windows only). """ import os import platform import subprocess import tempfile import threading import time from core.logger import log try: import psutil _HAS_PSUTIL = True except ImportError: _HAS_PSUTIL = False class RemoteDesktopLauncher: """Launch external RDP/VNC clients for remote desktop connections.""" @staticmethod def launch_rdp(server: dict) -> str: """Generate a .rdp temp file and launch the system RDP client.""" hostname = server["ip"] port = server.get("port", 3389) user = server.get("user", "Administrator") rdp_content = ( f"full address:s:{hostname}:{port}\r\n" f"username:s:{user}\r\n" "prompt for credentials:i:1\r\n" "screen mode id:i:2\r\n" "desktopwidth:i:1920\r\n" "desktopheight:i:1080\r\n" "session bpp:i:32\r\n" "compression:i:1\r\n" "disable wallpaper:i:0\r\n" "allow font smoothing:i:1\r\n" "networkautodetect:i:1\r\n" "bandwidthautodetect:i:1\r\n" ) alias = server.get("alias", "remote") rdp_file = os.path.join(tempfile.gettempdir(), f"sm_{alias}.rdp") with open(rdp_file, "w", encoding="utf-8") as f: f.write(rdp_content) log.info(f"RDP file created: {rdp_file}") system = platform.system() if system == "Windows": os.startfile(rdp_file) return f"RDP launched via mstsc for {alias}" elif system == "Linux": try: subprocess.Popen( ["xfreerdp", f"/v:{hostname}:{port}", f"/u:{user}", "/dynamic-resolution"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return f"RDP launched via xfreerdp for {alias}" except FileNotFoundError: log.warning("xfreerdp not found, trying rdesktop") subprocess.Popen( ["rdesktop", f"{hostname}:{port}", "-u", user], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return f"RDP launched via rdesktop for {alias}" elif system == "Darwin": subprocess.Popen(["open", rdp_file]) return f"RDP launched via macOS for {alias}" else: return f"Unsupported platform: {system}. RDP file saved to {rdp_file}" @staticmethod def launch_vnc(server: dict) -> str: """Launch a VNC viewer for the given server.""" hostname = server["ip"] port = server.get("port", 5900) alias = server.get("alias", "remote") target = f"{hostname}:{port}" log.info(f"VNC launching for {alias} at {target}") system = platform.system() if system == "Windows": viewers = [ r"C:\Program Files\TightVNC\tvnviewer.exe", r"C:\Program Files (x86)\TightVNC\tvnviewer.exe", r"C:\Program Files\RealVNC\VNC Viewer\vncviewer.exe", r"C:\Program Files (x86)\RealVNC\VNC Viewer\vncviewer.exe", ] for viewer in viewers: if os.path.exists(viewer): subprocess.Popen([viewer, target]) return f"VNC launched via {os.path.basename(viewer)} for {alias}" try: subprocess.Popen(["vncviewer", target]) return f"VNC launched via vncviewer for {alias}" except FileNotFoundError: return "No VNC viewer found. Install TightVNC or RealVNC Viewer." elif system == "Linux": for cmd in ["vncviewer", "xtigervncviewer", "remmina"]: try: args = [cmd, target] if cmd != "remmina" else [cmd, f"vnc://{target}"] subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return f"VNC launched via {cmd} for {alias}" except FileNotFoundError: continue return "No VNC viewer found. Install tigervnc-viewer or remmina." elif system == "Darwin": subprocess.Popen(["open", f"vnc://{target}"]) return f"VNC launched via macOS Screen Sharing for {alias}" else: return f"Unsupported platform: {system}" # ── Embedded RDP (Windows only) ──────────────────────────────────── # Quality presets: name -> (connection_type, bpp, wallpaper, themes, font_smooth, aero) _QUALITY_PRESETS = { "auto": (7, 32, 0, 0, 1, 1), "lan": (6, 32, 0, 0, 1, 1), "broadband": (4, 24, 1, 0, 1, 0), "modem": (1, 16, 1, 1, 0, 0), } def _encrypt_rdp_password(password: str) -> str | None: """Encrypt password using DPAPI for .rdp auto-login (password 51:b:).""" if platform.system() != "Windows" or not password: return None try: import ctypes import ctypes.wintypes class DATA_BLOB(ctypes.Structure): _fields_ = [ ("cbData", ctypes.wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_char)), ] crypt32 = ctypes.windll.crypt32 kernel32 = ctypes.windll.kernel32 # RDP expects UTF-16LE encoded password pwd_bytes = password.encode("utf-16-le") blob_in = DATA_BLOB(len(pwd_bytes), ctypes.cast(ctypes.create_string_buffer(pwd_bytes, len(pwd_bytes)), ctypes.POINTER(ctypes.c_char))) blob_out = DATA_BLOB() if crypt32.CryptProtectData(ctypes.byref(blob_in), None, None, None, None, 0, ctypes.byref(blob_out)): enc_bytes = ctypes.string_at(blob_out.pbData, blob_out.cbData) kernel32.LocalFree(blob_out.pbData) return enc_bytes.hex() return None except Exception as e: log.warning(f"DPAPI password encryption failed: {e}") return None def _trust_rdp_server(hostname: str, port: int = 3389): """Pre-trust RDP server certificate via registry to suppress dialog.""" if platform.system() != "Windows": return try: import winreg # Create server entry — marks it as known key_path = f"Software\\Microsoft\\Terminal Server Client\\Servers\\{hostname}" try: key = winreg.CreateKeyEx(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_WRITE) winreg.SetValueEx(key, "UsernameHint", 0, winreg.REG_SZ, "") winreg.CloseKey(key) except Exception: pass # Set AuthenticationLevelOverride = 0 (don't warn about certificates) try: policy_path = "Software\\Microsoft\\Terminal Server Client" key = winreg.CreateKeyEx(winreg.HKEY_CURRENT_USER, policy_path, 0, winreg.KEY_WRITE) winreg.SetValueEx(key, "AuthenticationLevelOverride", 0, winreg.REG_DWORD, 0) winreg.CloseKey(key) except Exception: pass log.info(f"RDP server pre-trusted in registry: {hostname}") except Exception as e: log.warning(f"Failed to pre-trust RDP server: {e}") def _find_mstsc_pids(parent_pid: int) -> list[int]: """Get all mstsc-related PIDs (parent + children via psutil, + global scan).""" pids = {parent_pid} if _HAS_PSUTIL: # Children of the launched process try: parent = psutil.Process(parent_pid) for child in parent.children(recursive=True): if "mstsc" in child.name().lower(): pids.add(child.pid) except (psutil.NoSuchProcess, psutil.AccessDenied): pass # Global scan for mstsc processes started recently (within 10s) now = time.time() for proc in psutil.process_iter(["pid", "name", "create_time"]): try: info = proc.info if info["name"] and "mstsc" in info["name"].lower(): if abs(now - (info["create_time"] or 0)) < 10: pids.add(info["pid"]) except (psutil.NoSuchProcess, psutil.AccessDenied): pass return list(pids) class EmbeddedRDP: """Embed mstsc.exe inside a tkinter frame using Win32 SetParent. Windows-only. On other platforms, raises NotImplementedError. """ def __init__(self, server: dict, settings: dict | None = None): if platform.system() != "Windows": raise NotImplementedError("Embedded RDP is only supported on Windows") self.server = server self.settings = settings or {} self._process: subprocess.Popen | None = None self._mstsc_hwnd: int | None = None self._rdp_file: str | None = None self._connected = False self._parent_hwnd: int | None = None # Callbacks (set by GUI) self.on_embedded = None # called when window is embedded self.on_failed = None # called on embed failure self.on_disconnected = None # called when mstsc exits def generate_rdp_file(self, width: int, height: int) -> str: """Build a .rdp temp file with all settings.""" s = self.server cfg = self.settings hostname = s["ip"] port = s.get("port", 3389) user = s.get("user", "Administrator") password = s.get("password", "") quality = cfg.get("quality", "auto") conn_type, bpp, no_wallpaper, no_themes, font_smooth, aero = _QUALITY_PRESETS.get(quality, _QUALITY_PRESETS["auto"]) clipboard = 1 if cfg.get("clipboard", True) else 0 drives = "*" if cfg.get("drives", False) else "" printers = 1 if cfg.get("printers", False) else 0 lines = [ f"full address:s:{hostname}:{port}", f"username:s:{user}", f"desktopwidth:i:{width}", f"desktopheight:i:{height}", "screen mode id:i:1", # windowed (required for embedding) "smart sizing:i:1", # scale to window f"session bpp:i:{bpp}", f"connection type:i:{conn_type}", "compression:i:1", "networkautodetect:i:1", "bandwidthautodetect:i:1", f"disable wallpaper:i:{no_wallpaper}", f"disable themes:i:{no_themes}", f"allow font smoothing:i:{font_smooth}", f"disable cursor setting:i:0", f"allow desktop composition:i:{aero}", f"disable full window drag:i:{no_wallpaper}", f"disable menu anims:i:{no_wallpaper}", f"redirectclipboard:i:{clipboard}", f"redirectprinters:i:{printers}", "redirectsmartcards:i:0", "autoreconnection enabled:i:1", "prompt for credentials:i:0", "negotiate security layer:i:1", "authentication level:i:0", # no NLA warning "enablecredsspsupport:i:1", "promptcredentialonce:i:0", "enablerdsaadauth:i:0", ] if drives: lines.append(f"drivestoredirect:s:{drives}") # Auto-login: encrypt password via DPAPI enc_pwd = _encrypt_rdp_password(password) if enc_pwd: lines.append(f"password 51:b:{enc_pwd}") alias = self.server.get("alias", "rdp") self._rdp_file = os.path.join(tempfile.gettempdir(), f"sm_embed_{alias}.rdp") with open(self._rdp_file, "w", encoding="utf-8") as f: f.write("\r\n".join(lines) + "\r\n") log.info(f"Embedded RDP file: {self._rdp_file}") return self._rdp_file def launch(self, parent_hwnd: int, width: int = 1024, height: int = 768): """Launch mstsc and start background embed thread.""" self._parent_hwnd = parent_hwnd # Pre-trust server certificate to suppress dialog hostname = self.server["ip"] port = self.server.get("port", 3389) _trust_rdp_server(hostname, port) rdp_file = self.generate_rdp_file(width, height) self._launch_time = time.time() self._process = subprocess.Popen( ["mstsc.exe", rdp_file, f"/w:{width}", f"/h:{height}"], creationflags=0x00000010, # CREATE_NEW_CONSOLE suppressed ) log.info(f"mstsc.exe launched, PID={self._process.pid}") threading.Thread(target=self._find_and_embed, args=(parent_hwnd, width, height), daemon=True).start() def _find_and_embed(self, parent_hwnd: int, width: int, height: int): """Background: poll for mstsc window using multi-strategy search, then embed. Strategy: 1. Collect all mstsc PIDs (parent + children via psutil) 2. EnumWindows matching by PID set OR by hostname in title 3. Prefer TscShellContainerClass, fall back to dialogs (#32770) 4. Two-phase: if dialog embedded first, monitor for main window """ import ctypes import ctypes.wintypes user32 = ctypes.windll.user32 WNDENUMPROC = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.wintypes.HWND, ctypes.wintypes.LPARAM) hostname = self.server["ip"] initial_pid = self._process.pid for attempt in range(100): # 20 seconds max (100 * 200ms) # Check if parent process exited — but don't bail immediately, # mstsc spawns a child that survives the parent parent_exited = self._process.poll() is not None if parent_exited and attempt < 5: # Parent exited fast (normal for mstsc), keep searching pass elif parent_exited and attempt >= 50: log.warning("mstsc parent exited and no window found after 10s") if self.on_failed: self.on_failed("mstsc exited unexpectedly") return # Collect all candidate PIDs mstsc_pids = set(_find_mstsc_pids(initial_pid)) found_windows = [] def enum_callback(hwnd, _lparam): if not user32.IsWindowVisible(hwnd): return True class_buf = ctypes.create_unicode_buffer(256) user32.GetClassNameW(hwnd, class_buf, 256) class_name = class_buf.value title_buf = ctypes.create_unicode_buffer(512) user32.GetWindowTextW(hwnd, title_buf, 512) title = title_buf.value pid = ctypes.wintypes.DWORD() user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) win_pid = pid.value # Match by PID (any mstsc PID) pid_match = win_pid in mstsc_pids # Match by hostname in title (fallback for child PID mismatch) title_match = hostname in title # Match by mstsc class names is_mstsc_class = class_name in ( "TscShellContainerClass", "RAIL_WINDOW", "IHWindowClass" ) is_dialog = class_name == "#32770" if pid_match or (title_match and (is_mstsc_class or is_dialog)): found_windows.append((hwnd, class_name, title, win_pid)) return True user32.EnumWindows(WNDENUMPROC(enum_callback), 0) if found_windows: log.info(f"Embed attempt {attempt}: found {len(found_windows)} mstsc windows") for hwnd, cls, title, pid in found_windows: log.info(f" HWND={hwnd} class={cls!r} title={title!r} PID={pid}") # Prefer TscShellContainerClass (main RDP window) target_hwnd = None for hwnd, cls, title, pid in found_windows: if cls == "TscShellContainerClass": target_hwnd = hwnd break # Fallback: any non-dialog mstsc window if not target_hwnd: for hwnd, cls, title, pid in found_windows: if cls not in ("#32770",): target_hwnd = hwnd break # Last resort: dialog (certificate warning etc.) if not target_hwnd: target_hwnd = found_windows[0][0] log.info(f"Embedding dialog window HWND={target_hwnd}") self._mstsc_hwnd = target_hwnd self._embed(parent_hwnd, width, height) # Phase 2: if we embedded a dialog, watch for the real window embedded_class = None for hwnd, cls, title, pid in found_windows: if hwnd == target_hwnd: embedded_class = cls break if embedded_class != "TscShellContainerClass": self._watch_for_main_window( parent_hwnd, width, height, initial_pid, hostname ) return time.sleep(0.2) log.error("Timed out waiting for mstsc window") if self.on_failed: self.on_failed("Timed out waiting for RDP window") def _watch_for_main_window(self, parent_hwnd: int, width: int, height: int, initial_pid: int, hostname: str): """Phase 2: after embedding a dialog, watch for TscShellContainerClass.""" import ctypes import ctypes.wintypes user32 = ctypes.windll.user32 WNDENUMPROC = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.wintypes.HWND, ctypes.wintypes.LPARAM) log.info("Phase 2: watching for main RDP window after dialog...") for _ in range(150): # 30 seconds for user to handle dialog time.sleep(0.2) # Check if current embedded window is still alive if self._mstsc_hwnd and not user32.IsWindow(self._mstsc_hwnd): log.info("Embedded dialog window closed, searching for main window...") mstsc_pids = set(_find_mstsc_pids(initial_pid)) found_main = None def enum_callback(hwnd, _lparam): nonlocal found_main if not user32.IsWindowVisible(hwnd): return True class_buf = ctypes.create_unicode_buffer(256) user32.GetClassNameW(hwnd, class_buf, 256) if class_buf.value != "TscShellContainerClass": return True pid = ctypes.wintypes.DWORD() user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) title_buf = ctypes.create_unicode_buffer(512) user32.GetWindowTextW(hwnd, title_buf, 512) if pid.value in mstsc_pids or hostname in title_buf.value: found_main = hwnd return False return True user32.EnumWindows(WNDENUMPROC(enum_callback), 0) if found_main and found_main != self._mstsc_hwnd: log.info(f"Phase 2: found main RDP window HWND={found_main}, re-embedding") self._mstsc_hwnd = found_main self._embed(parent_hwnd, width, height) return log.info("Phase 2: no main window appeared (dialog may have been cancelled)") def _embed(self, parent_hwnd: int, width: int, height: int): """Reparent mstsc window into the tkinter frame.""" import ctypes import ctypes.wintypes user32 = ctypes.windll.user32 hwnd = self._mstsc_hwnd GWL_STYLE = -16 WS_CHILD = 0x40000000 WS_VISIBLE = 0x10000000 WS_CAPTION = 0x00C00000 WS_THICKFRAME = 0x00040000 WS_POPUP = 0x80000000 WS_SYSMENU = 0x00080000 WS_MINIMIZEBOX = 0x00020000 WS_MAXIMIZEBOX = 0x00010000 SWP_FRAMECHANGED = 0x0020 SWP_NOZORDER = 0x0004 try: # Remove decorations, make child style = user32.GetWindowLongW(hwnd, GWL_STYLE) style = (style | WS_CHILD | WS_VISIBLE) & ~( WS_POPUP | WS_CAPTION | WS_THICKFRAME | WS_SYSMENU | WS_MINIMIZEBOX | WS_MAXIMIZEBOX ) user32.SetWindowLongW(hwnd, GWL_STYLE, style) # Reparent user32.SetParent(hwnd, parent_hwnd) # Resize to fill parent user32.MoveWindow(hwnd, 0, 0, width, height, True) # Apply style change user32.SetWindowPos(hwnd, 0, 0, 0, 0, 0, SWP_FRAMECHANGED | SWP_NOZORDER | 0x0001 | 0x0002) # Focus user32.SetFocus(hwnd) self._connected = True log.info(f"mstsc embedded: HWND={hwnd} into parent={parent_hwnd}") if self.on_embedded: self.on_embedded() except Exception as e: log.error(f"Failed to embed mstsc: {e}") if self.on_failed: self.on_failed(str(e)) def resize(self, width: int, height: int): """Resize the embedded mstsc window.""" if not self._mstsc_hwnd or not self._connected: return try: import ctypes ctypes.windll.user32.MoveWindow(self._mstsc_hwnd, 0, 0, width, height, True) except Exception: pass def focus(self): """Give focus to the embedded mstsc window.""" if not self._mstsc_hwnd: return try: import ctypes ctypes.windll.user32.SetFocus(self._mstsc_hwnd) except Exception: pass def detach(self): """Detach mstsc from parent — for fullscreen.""" if not self._mstsc_hwnd or not self._connected: return try: import ctypes import ctypes.wintypes user32 = ctypes.windll.user32 hwnd = self._mstsc_hwnd # Reparent to desktop user32.SetParent(hwnd, 0) # Restore normal window style WS_OVERLAPPEDWINDOW = 0x00CF0000 WS_VISIBLE = 0x10000000 user32.SetWindowLongW(hwnd, -16, WS_OVERLAPPEDWINDOW | WS_VISIBLE) # Maximize user32.ShowWindow(hwnd, 3) # SW_MAXIMIZE SWP_FRAMECHANGED = 0x0020 SWP_NOZORDER = 0x0004 user32.SetWindowPos(hwnd, 0, 0, 0, 0, 0, SWP_FRAMECHANGED | SWP_NOZORDER | 0x0001 | 0x0002) user32.SetForegroundWindow(hwnd) log.info("mstsc detached to fullscreen") except Exception as e: log.error(f"Detach failed: {e}") def reattach(self, parent_hwnd: int, width: int, height: int): """Re-embed mstsc back into the tkinter frame.""" if not self._mstsc_hwnd: return self._embed(parent_hwnd, width, height) log.info("mstsc reattached") def disconnect(self): """Terminate mstsc (parent + children) and clean up.""" self._connected = False # Kill all mstsc child processes if self._process and _HAS_PSUTIL: try: pids = _find_mstsc_pids(self._process.pid) for pid in pids: try: p = psutil.Process(pid) p.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied): pass except Exception: pass if self._process: try: self._process.terminate() self._process.wait(timeout=3) except Exception: try: self._process.kill() except Exception: pass self._process = None self._mstsc_hwnd = None if self._rdp_file and os.path.exists(self._rdp_file): try: os.remove(self._rdp_file) except Exception: pass self._rdp_file = None log.info("EmbeddedRDP disconnected") def is_alive(self) -> bool: """Check if mstsc process is still running (parent or children).""" if not self._process: return False # Parent still alive if self._process.poll() is None: return True # Parent exited but child mstsc may be alive if _HAS_PSUTIL: try: pids = _find_mstsc_pids(self._process.pid) for pid in pids: if pid == self._process.pid: continue try: p = psutil.Process(pid) if p.is_running(): return True except (psutil.NoSuchProcess, psutil.AccessDenied): pass except Exception: pass # Check if embedded window still exists if self._mstsc_hwnd: try: import ctypes return bool(ctypes.windll.user32.IsWindow(self._mstsc_hwnd)) except Exception: pass return False @property def connected(self) -> bool: return self._connected and self.is_alive()