Files
server-manager/core/remote_desktop.py
chrome-storm-c442 4959004a3f v1.8.52: icons module, Windows SSH sanitization, embedded RDP improvements, UI polish
- Add core/icons.py — centralized icon text helper with emoji/symbol support
- Add Windows SSH command sanitization in ssh.py (Linux→Windows auto-translation)
- Improve embedded RDP: launch tab connect/disconnect, fullscreen toggle
- Refactor sidebar: cleaner server type badges
- Update server_dialog: adaptive fields per server type
- Add setup_openssh.bat tool
- Update skill-ssh.md and CLAUDE.md docs for Windows SSH support
- Cleanup old releases, add v1.8.48-v1.8.52

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 14:37:37 -05:00

881 lines
33 KiB
Python

"""
Remote desktop launchers — RDP and VNC.
RemoteDesktopLauncher: external client launch (VNC, non-Windows RDP).
EmbeddedRDP: embed mstsc.exe inside a tkinter frame via SetParent (Windows only).
"""
import os
import platform
import subprocess
import tempfile
import threading
import time
from core.logger import log
try:
import psutil
_HAS_PSUTIL = True
except ImportError:
_HAS_PSUTIL = False
class RemoteDesktopLauncher:
"""Launch external RDP/VNC clients for remote desktop connections."""
@staticmethod
def launch_rdp(server: dict) -> str:
"""Generate a .rdp temp file and launch the system RDP client."""
hostname = server["ip"]
port = server.get("port", 3389)
user = server.get("user", "Administrator")
rdp_content = (
f"full address:s:{hostname}:{port}\r\n"
f"username:s:{user}\r\n"
"prompt for credentials:i:1\r\n"
"screen mode id:i:2\r\n"
"desktopwidth:i:1920\r\n"
"desktopheight:i:1080\r\n"
"session bpp:i:32\r\n"
"compression:i:1\r\n"
"disable wallpaper:i:0\r\n"
"allow font smoothing:i:1\r\n"
"networkautodetect:i:1\r\n"
"bandwidthautodetect:i:1\r\n"
)
alias = server.get("alias", "remote")
rdp_file = os.path.join(tempfile.gettempdir(), f"sm_{alias}.rdp")
with open(rdp_file, "w", encoding="utf-8") as f:
f.write(rdp_content)
log.info(f"RDP file created: {rdp_file}")
system = platform.system()
if system == "Windows":
os.startfile(rdp_file)
return f"RDP launched via mstsc for {alias}"
elif system == "Linux":
try:
subprocess.Popen(
["xfreerdp", f"/v:{hostname}:{port}", f"/u:{user}", "/dynamic-resolution"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return f"RDP launched via xfreerdp for {alias}"
except FileNotFoundError:
log.warning("xfreerdp not found, trying rdesktop")
subprocess.Popen(
["rdesktop", f"{hostname}:{port}", "-u", user],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return f"RDP launched via rdesktop for {alias}"
elif system == "Darwin":
subprocess.Popen(["open", rdp_file])
return f"RDP launched via macOS for {alias}"
else:
return f"Unsupported platform: {system}. RDP file saved to {rdp_file}"
@staticmethod
def launch_vnc(server: dict) -> str:
"""Launch a VNC viewer for the given server."""
hostname = server["ip"]
port = server.get("port", 5900)
alias = server.get("alias", "remote")
target = f"{hostname}:{port}"
log.info(f"VNC launching for {alias} at {target}")
system = platform.system()
if system == "Windows":
viewers = [
r"C:\Program Files\TightVNC\tvnviewer.exe",
r"C:\Program Files (x86)\TightVNC\tvnviewer.exe",
r"C:\Program Files\RealVNC\VNC Viewer\vncviewer.exe",
r"C:\Program Files (x86)\RealVNC\VNC Viewer\vncviewer.exe",
]
for viewer in viewers:
if os.path.exists(viewer):
subprocess.Popen([viewer, target])
return f"VNC launched via {os.path.basename(viewer)} for {alias}"
try:
subprocess.Popen(["vncviewer", target])
return f"VNC launched via vncviewer for {alias}"
except FileNotFoundError:
return "No VNC viewer found. Install TightVNC or RealVNC Viewer."
elif system == "Linux":
for cmd in ["vncviewer", "xtigervncviewer", "remmina"]:
try:
args = [cmd, target] if cmd != "remmina" else [cmd, f"vnc://{target}"]
subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return f"VNC launched via {cmd} for {alias}"
except FileNotFoundError:
continue
return "No VNC viewer found. Install tigervnc-viewer or remmina."
elif system == "Darwin":
subprocess.Popen(["open", f"vnc://{target}"])
return f"VNC launched via macOS Screen Sharing for {alias}"
else:
return f"Unsupported platform: {system}"
# ── Embedded RDP (Windows only) ────────────────────────────────────
# Quality presets: name -> (connection_type, bpp, wallpaper, themes, font_smooth, aero)
_QUALITY_PRESETS = {
"auto": (7, 32, 0, 0, 1, 1),
"lan": (6, 32, 0, 0, 1, 1),
"broadband": (4, 24, 1, 0, 1, 0),
"modem": (1, 16, 1, 1, 0, 0),
}
def _encrypt_rdp_password(password: str) -> str | None:
"""Encrypt password using DPAPI for .rdp auto-login (password 51:b:)."""
if platform.system() != "Windows" or not password:
return None
try:
import ctypes
import ctypes.wintypes
class DATA_BLOB(ctypes.Structure):
_fields_ = [
("cbData", ctypes.wintypes.DWORD),
("pbData", ctypes.POINTER(ctypes.c_char)),
]
crypt32 = ctypes.windll.crypt32
kernel32 = ctypes.windll.kernel32
# RDP expects UTF-16LE encoded password
pwd_bytes = password.encode("utf-16-le")
blob_in = DATA_BLOB(len(pwd_bytes), ctypes.cast(ctypes.create_string_buffer(pwd_bytes, len(pwd_bytes)), ctypes.POINTER(ctypes.c_char)))
blob_out = DATA_BLOB()
if crypt32.CryptProtectData(ctypes.byref(blob_in), None, None, None, None, 0, ctypes.byref(blob_out)):
enc_bytes = ctypes.string_at(blob_out.pbData, blob_out.cbData)
kernel32.LocalFree(blob_out.pbData)
return enc_bytes.hex()
return None
except Exception as e:
log.warning(f"DPAPI password encryption failed: {e}")
return None
def _trust_rdp_server(hostname: str, port: int = 3389):
"""Pre-trust RDP server certificate via registry to suppress dialog."""
if platform.system() != "Windows":
return
try:
import winreg
# Create server entry — marks it as known
key_path = f"Software\\Microsoft\\Terminal Server Client\\Servers\\{hostname}"
try:
key = winreg.CreateKeyEx(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_WRITE)
winreg.SetValueEx(key, "UsernameHint", 0, winreg.REG_SZ, "")
winreg.CloseKey(key)
except Exception:
pass
# Set AuthenticationLevelOverride = 0 (don't warn about certificates)
try:
policy_path = "Software\\Microsoft\\Terminal Server Client"
key = winreg.CreateKeyEx(winreg.HKEY_CURRENT_USER, policy_path, 0, winreg.KEY_WRITE)
winreg.SetValueEx(key, "AuthenticationLevelOverride", 0, winreg.REG_DWORD, 0)
winreg.CloseKey(key)
except Exception:
pass
log.info(f"RDP server pre-trusted in registry: {hostname}")
except Exception as e:
log.warning(f"Failed to pre-trust RDP server: {e}")
def _find_mstsc_pids(parent_pid: int) -> list[int]:
"""Get all mstsc-related PIDs (parent + children via psutil, + global scan)."""
pids = {parent_pid}
if _HAS_PSUTIL:
# Children of the launched process
try:
parent = psutil.Process(parent_pid)
for child in parent.children(recursive=True):
if "mstsc" in child.name().lower():
pids.add(child.pid)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Global scan for mstsc processes started recently (within 10s)
now = time.time()
for proc in psutil.process_iter(["pid", "name", "create_time"]):
try:
info = proc.info
if info["name"] and "mstsc" in info["name"].lower():
if abs(now - (info["create_time"] or 0)) < 10:
pids.add(info["pid"])
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return list(pids)
class EmbeddedRDP:
"""Embed mstsc.exe inside a tkinter frame using Win32 SetParent.
Windows-only. On other platforms, raises NotImplementedError.
"""
def __init__(self, server: dict, settings: dict | None = None):
if platform.system() != "Windows":
raise NotImplementedError("Embedded RDP is only supported on Windows")
self.server = server
self.settings = settings or {}
self._process: subprocess.Popen | None = None
self._mstsc_hwnd: int | None = None
self._rdp_file: str | None = None
self._connected = False
self._parent_hwnd: int | None = None
self._is_detached = False # True when intentionally fullscreen
# Callbacks (set by GUI)
self.on_embedded = None # called when window is embedded
self.on_failed = None # called on embed failure
self.on_disconnected = None # called when mstsc exits
def generate_rdp_file(self, window_w: int, window_h: int) -> str:
"""Build a .rdp temp file with all settings.
window_w/window_h: physical frame size for embedding.
Session resolution comes from settings["resolution"] (e.g. "1920x1080" or "auto").
"""
s = self.server
cfg = self.settings
hostname = s["ip"]
port = s.get("port", 3389)
user = s.get("user", "Administrator")
password = s.get("password", "")
# Session resolution.
# "auto": use frame size — session matches the embed area exactly.
# Fixed (e.g. "1920x1080"): use that resolution, smart sizing scales
# the content to fit the frame.
resolution = cfg.get("resolution", "auto")
if resolution != "auto":
parts = resolution.split("x")
desk_w, desk_h = int(parts[0]), int(parts[1])
else:
desk_w, desk_h = window_w, window_h
quality = cfg.get("quality", "auto")
conn_type, bpp, no_wallpaper, no_themes, font_smooth, aero = _QUALITY_PRESETS.get(quality, _QUALITY_PRESETS["auto"])
clipboard = 1 if cfg.get("clipboard", True) else 0
drives = "*" if cfg.get("drives", False) else ""
printers = 1 if cfg.get("printers", False) else 0
lines = [
f"full address:s:{hostname}:{port}",
f"username:s:{user}",
f"desktopwidth:i:{desk_w}",
f"desktopheight:i:{desk_h}",
"screen mode id:i:1", # windowed (required for embedding)
"use multimon:i:0",
f"session bpp:i:{bpp}",
f"connection type:i:{conn_type}",
"compression:i:1",
"networkautodetect:i:1",
"bandwidthautodetect:i:1",
f"disable wallpaper:i:{no_wallpaper}",
f"disable themes:i:{no_themes}",
f"allow font smoothing:i:{font_smooth}",
f"disable cursor setting:i:0",
f"allow desktop composition:i:{aero}",
f"disable full window drag:i:{no_wallpaper}",
f"disable menu anims:i:{no_wallpaper}",
f"redirectclipboard:i:{clipboard}",
f"redirectprinters:i:{printers}",
"redirectsmartcards:i:0",
"autoreconnection enabled:i:1",
"prompt for credentials:i:0",
"negotiate security layer:i:1",
"authentication level:i:0", # no NLA warning
"enablecredsspsupport:i:1",
"promptcredentialonce:i:0",
"enablerdsaadauth:i:0",
]
# smart sizing scales the fixed-resolution bitmap to fit the mstsc window.
# No dynamic resolution — session resolution is set once at connect time.
lines.append("smart sizing:i:1")
if drives:
lines.append(f"drivestoredirect:s:{drives}")
# Auto-login: encrypt password via DPAPI
enc_pwd = _encrypt_rdp_password(password)
if enc_pwd:
lines.append(f"password 51:b:{enc_pwd}")
alias = self.server.get("alias", "rdp")
self._rdp_file = os.path.join(tempfile.gettempdir(), f"sm_embed_{alias}.rdp")
with open(self._rdp_file, "w", encoding="utf-8") as f:
f.write("\r\n".join(lines) + "\r\n")
log.info(f"Embedded RDP file: {self._rdp_file}")
return self._rdp_file
def launch(self, parent_hwnd: int, window_w: int = 1024, window_h: int = 768):
"""Launch mstsc and start background embed thread.
window_w/window_h: physical size of the embedding frame.
Session resolution is set via settings["resolution"] in the .rdp file.
"""
self._parent_hwnd = parent_hwnd
# Pre-trust server certificate to suppress dialog
hostname = self.server["ip"]
port = self.server.get("port", 3389)
_trust_rdp_server(hostname, port)
rdp_file = self.generate_rdp_file(window_w, window_h)
self._launch_time = time.time()
# Always launch mstsc at frame size — smart sizing scales the session
self._process = subprocess.Popen(
["mstsc.exe", rdp_file, f"/w:{window_w}", f"/h:{window_h}"],
creationflags=0x00000010, # CREATE_NEW_CONSOLE suppressed
)
log.info(f"mstsc.exe launched, PID={self._process.pid}")
threading.Thread(target=self._find_and_embed, args=(parent_hwnd, window_w, window_h), daemon=True).start()
def _find_and_embed(self, parent_hwnd: int, width: int, height: int):
"""Background: poll for mstsc window using multi-strategy search, then embed.
Strategy:
1. Collect all mstsc PIDs (parent + children via psutil)
2. EnumWindows matching by PID set OR by hostname in title
3. Prefer TscShellContainerClass, fall back to dialogs (#32770)
4. Two-phase: if dialog embedded first, monitor for main window
"""
import ctypes
import ctypes.wintypes
user32 = ctypes.windll.user32
WNDENUMPROC = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.wintypes.HWND, ctypes.wintypes.LPARAM)
hostname = self.server["ip"]
initial_pid = self._process.pid
for attempt in range(100): # 20 seconds max (100 * 200ms)
# Check if parent process exited — but don't bail immediately,
# mstsc spawns a child that survives the parent
parent_exited = self._process.poll() is not None
if parent_exited and attempt < 5:
# Parent exited fast (normal for mstsc), keep searching
pass
elif parent_exited and attempt >= 50:
log.warning("mstsc parent exited and no window found after 10s")
if self.on_failed:
self.on_failed("mstsc exited unexpectedly")
return
# Collect all candidate PIDs
mstsc_pids = set(_find_mstsc_pids(initial_pid))
found_windows = []
def enum_callback(hwnd, _lparam):
if not user32.IsWindowVisible(hwnd):
return True
class_buf = ctypes.create_unicode_buffer(256)
user32.GetClassNameW(hwnd, class_buf, 256)
class_name = class_buf.value
title_buf = ctypes.create_unicode_buffer(512)
user32.GetWindowTextW(hwnd, title_buf, 512)
title = title_buf.value
pid = ctypes.wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
win_pid = pid.value
# Match by PID (any mstsc PID)
pid_match = win_pid in mstsc_pids
# Match by hostname in title (fallback for child PID mismatch)
title_match = hostname in title
# Match by mstsc class names
is_mstsc_class = class_name in (
"TscShellContainerClass", "RAIL_WINDOW", "IHWindowClass"
)
is_dialog = class_name == "#32770"
if (pid_match or title_match) and (is_mstsc_class or is_dialog):
found_windows.append((hwnd, class_name, title, win_pid))
return True
user32.EnumWindows(WNDENUMPROC(enum_callback), 0)
if found_windows:
log.info(f"Embed attempt {attempt}: found {len(found_windows)} mstsc windows")
for hwnd, cls, title, pid in found_windows:
log.info(f" HWND={hwnd} class={cls!r} title={title!r} PID={pid}")
# Prefer TscShellContainerClass (main RDP window)
target_hwnd = None
for hwnd, cls, title, pid in found_windows:
if cls == "TscShellContainerClass":
target_hwnd = hwnd
break
# Fallback: any non-dialog mstsc window
if not target_hwnd:
for hwnd, cls, title, pid in found_windows:
if cls not in ("#32770",):
target_hwnd = hwnd
break
# Last resort: dialog (certificate warning etc.)
if not target_hwnd:
target_hwnd = found_windows[0][0]
log.info(f"Embedding dialog window HWND={target_hwnd}")
self._mstsc_hwnd = target_hwnd
self._embed(parent_hwnd, width, height)
# Phase 2: if we embedded a dialog, watch for the real window
embedded_class = None
for hwnd, cls, title, pid in found_windows:
if hwnd == target_hwnd:
embedded_class = cls
break
if embedded_class != "TscShellContainerClass":
self._watch_for_main_window(
parent_hwnd, width, height,
initial_pid, hostname
)
return
time.sleep(0.2)
log.error("Timed out waiting for mstsc window")
if self.on_failed:
self.on_failed("Timed out waiting for RDP window")
def _watch_for_main_window(self, parent_hwnd: int, width: int, height: int,
initial_pid: int, hostname: str):
"""Phase 2: after embedding a dialog, watch for TscShellContainerClass."""
import ctypes
import ctypes.wintypes
user32 = ctypes.windll.user32
WNDENUMPROC = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.wintypes.HWND, ctypes.wintypes.LPARAM)
log.info("Phase 2: watching for main RDP window after dialog...")
for _ in range(150): # 30 seconds for user to handle dialog
time.sleep(0.2)
# Check if current embedded window is still alive
if self._mstsc_hwnd and not user32.IsWindow(self._mstsc_hwnd):
log.info("Embedded dialog window closed, searching for main window...")
mstsc_pids = set(_find_mstsc_pids(initial_pid))
found_main = None
def enum_callback(hwnd, _lparam):
nonlocal found_main
if not user32.IsWindowVisible(hwnd):
return True
class_buf = ctypes.create_unicode_buffer(256)
user32.GetClassNameW(hwnd, class_buf, 256)
if class_buf.value != "TscShellContainerClass":
return True
pid = ctypes.wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
title_buf = ctypes.create_unicode_buffer(512)
user32.GetWindowTextW(hwnd, title_buf, 512)
if pid.value in mstsc_pids or hostname in title_buf.value:
found_main = hwnd
return False
return True
user32.EnumWindows(WNDENUMPROC(enum_callback), 0)
if found_main and found_main != self._mstsc_hwnd:
log.info(f"Phase 2: found main RDP window HWND={found_main}, re-embedding")
self._mstsc_hwnd = found_main
self._embed(parent_hwnd, width, height)
return
log.info("Phase 2: no main window appeared (dialog may have been cancelled)")
def _embed(self, parent_hwnd: int, width: int, height: int):
"""Reparent mstsc window into the tkinter frame."""
import ctypes
import ctypes.wintypes
self._parent_hwnd = parent_hwnd
user32 = ctypes.windll.user32
kernel32 = ctypes.windll.kernel32
hwnd = self._mstsc_hwnd
GWL_STYLE = -16
GWL_EXSTYLE = -20
WS_CHILD = 0x40000000
WS_VISIBLE = 0x10000000
WS_CAPTION = 0x00C00000
WS_THICKFRAME = 0x00040000
WS_POPUP = 0x80000000
WS_SYSMENU = 0x00080000
WS_MINIMIZEBOX = 0x00020000
WS_MAXIMIZEBOX = 0x00010000
WS_EX_APPWINDOW = 0x00040000
WS_EX_WINDOWEDGE = 0x00000100
SWP_FRAMECHANGED = 0x0020
SWP_NOZORDER = 0x0004
try:
# Attach thread input queues — required for cross-process SetParent
target_tid = user32.GetWindowThreadProcessId(hwnd, None)
our_tid = kernel32.GetCurrentThreadId()
attached = False
if target_tid != our_tid:
attached = bool(user32.AttachThreadInput(our_tid, target_tid, True))
log.info(f"AttachThreadInput({our_tid}, {target_tid}): {attached}")
# Remove decorations, make child
# Use unsigned 32-bit mask to avoid ctypes overflow
style = user32.GetWindowLongW(hwnd, GWL_STYLE) & 0xFFFFFFFF
new_style = (style | WS_CHILD | WS_VISIBLE) & ~(
WS_POPUP | WS_CAPTION | WS_THICKFRAME |
WS_SYSMENU | WS_MINIMIZEBOX | WS_MAXIMIZEBOX
) & 0xFFFFFFFF
user32.SetWindowLongW(hwnd, GWL_STYLE, ctypes.c_long(new_style).value)
# Also strip extended styles that keep it on taskbar
ex_style = user32.GetWindowLongW(hwnd, GWL_EXSTYLE) & 0xFFFFFFFF
new_ex = (ex_style & ~(WS_EX_APPWINDOW | WS_EX_WINDOWEDGE)) & 0xFFFFFFFF
user32.SetWindowLongW(hwnd, GWL_EXSTYLE, ctypes.c_long(new_ex).value)
# Reparent
result = user32.SetParent(hwnd, parent_hwnd)
log.info(f"SetParent(hwnd={hwnd}, parent={parent_hwnd}) = {result}")
if not result:
# SetParent failed — try alternative: set owner instead
error = kernel32.GetLastError()
log.warning(f"SetParent failed, GetLastError={error}. Trying ShowWindow approach.")
# Force the window into position over our frame anyway
user32.SetWindowPos(hwnd, 0, 0, 0, width, height,
SWP_FRAMECHANGED | SWP_NOZORDER)
else:
# Resize to fill parent
user32.MoveWindow(hwnd, 0, 0, width, height, True)
# Apply style change — recalculates non-client area
user32.SetWindowPos(hwnd, 0, 0, 0, 0, 0,
SWP_FRAMECHANGED | SWP_NOZORDER | 0x0001 | 0x0002)
# Re-position after FRAMECHANGED to fix non-client area offset
user32.MoveWindow(hwnd, 0, 0, width, height, True)
# Focus
user32.SetFocus(hwnd)
# Detach thread input
if attached:
user32.AttachThreadInput(our_tid, target_tid, False)
self._connected = True
log.info(f"mstsc embedded: HWND={hwnd} into parent={parent_hwnd} (SetParent={result})")
if self.on_embedded:
self.on_embedded()
except Exception as e:
log.error(f"Failed to embed mstsc: {e}")
if self.on_failed:
self.on_failed(str(e))
def resize(self, width: int, height: int):
"""Resize the embedded mstsc window."""
if not self._mstsc_hwnd or not self._connected:
return
if width < 200 or height < 150:
return # Ignore degenerate sizes
try:
import ctypes
user32 = ctypes.windll.user32
if not user32.IsWindow(self._mstsc_hwnd):
return
user32.MoveWindow(self._mstsc_hwnd, 0, 0, width, height, True)
except Exception:
pass
def focus(self):
"""Give focus to the embedded mstsc window."""
if not self._mstsc_hwnd:
return
try:
import ctypes
ctypes.windll.user32.SetFocus(self._mstsc_hwnd)
except Exception:
pass
def detach(self):
"""Detach mstsc from parent (step 1). Call maximize() after a delay."""
if not self._mstsc_hwnd or not self._connected:
return
self._is_detached = True
try:
import ctypes
import ctypes.wintypes
user32 = ctypes.windll.user32
kernel32 = ctypes.windll.kernel32
hwnd = self._mstsc_hwnd
# Thread input attachment for cross-process style changes
target_tid = user32.GetWindowThreadProcessId(hwnd, None)
our_tid = kernel32.GetCurrentThreadId()
attached = False
if target_tid != our_tid:
attached = bool(user32.AttachThreadInput(our_tid, target_tid, True))
# Reparent to desktop
user32.SetParent(hwnd, 0)
# Remove WS_CHILD, set normal top-level window style
GWL_STYLE = -16
WS_OVERLAPPEDWINDOW = 0x00CF0000
WS_VISIBLE = 0x10000000
WS_CHILD = 0x40000000
style = user32.GetWindowLongW(hwnd, GWL_STYLE) & 0xFFFFFFFF
new_style = ((style & ~WS_CHILD) | WS_OVERLAPPEDWINDOW | WS_VISIBLE) & 0xFFFFFFFF
user32.SetWindowLongW(hwnd, GWL_STYLE, ctypes.c_long(new_style).value)
# Apply frame change
SWP_FRAMECHANGED = 0x0020
SWP_NOZORDER = 0x0004
SWP_NOSIZE = 0x0001
SWP_NOMOVE = 0x0002
user32.SetWindowPos(hwnd, 0, 0, 0, 0, 0,
SWP_FRAMECHANGED | SWP_NOZORDER | SWP_NOSIZE | SWP_NOMOVE)
if attached:
user32.AttachThreadInput(our_tid, target_tid, False)
log.info("mstsc detached from parent")
except Exception as e:
log.error(f"Detach failed: {e}")
def maximize(self):
"""Maximize the detached mstsc window (step 2, call after delay)."""
if not self._mstsc_hwnd:
return
try:
import ctypes
user32 = ctypes.windll.user32
hwnd = self._mstsc_hwnd
# Bring to foreground
user32.SetForegroundWindow(hwnd)
# Try ShowWindow first
user32.ShowWindow(hwnd, 3) # SW_MAXIMIZE
# Fallback: explicit resize to screen
screen_w = user32.GetSystemMetrics(0)
screen_h = user32.GetSystemMetrics(1)
user32.MoveWindow(hwnd, 0, 0, screen_w, screen_h, True)
log.info(f"mstsc maximize attempted ({screen_w}x{screen_h})")
except Exception as e:
log.error(f"Maximize failed: {e}")
def reattach(self, parent_hwnd: int, width: int, height: int):
"""Re-embed mstsc back into the tkinter frame."""
if not self._mstsc_hwnd:
return
self._is_detached = False
self._embed(parent_hwnd, width, height)
log.info("mstsc reattached")
def disconnect(self):
"""Terminate mstsc (parent + children) and clean up."""
self._connected = False
# Kill all mstsc child processes
if self._process and _HAS_PSUTIL:
try:
pids = _find_mstsc_pids(self._process.pid)
for pid in pids:
try:
p = psutil.Process(pid)
p.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
except Exception:
pass
if self._process:
try:
self._process.terminate()
self._process.wait(timeout=3)
except Exception:
try:
self._process.kill()
except Exception:
pass
self._process = None
self._mstsc_hwnd = None
if self._rdp_file and os.path.exists(self._rdp_file):
try:
os.remove(self._rdp_file)
except Exception:
pass
self._rdp_file = None
log.info("EmbeddedRDP disconnected")
def try_reembed(self, parent_hwnd: int, width: int, height: int) -> bool:
"""Try to re-embed the mstsc window after reconnect.
Handles two scenarios:
1. Same HWND got reparented to desktop (mstsc reconnect resets parent)
2. New HWND created (mstsc spawned a new window)
Returns True if a window was found and embedded.
"""
import ctypes
import ctypes.wintypes
user32 = ctypes.windll.user32
# Scenario 1: current HWND is still valid but lost its parent
if self._mstsc_hwnd and user32.IsWindow(self._mstsc_hwnd):
actual_parent = user32.GetParent(self._mstsc_hwnd)
if actual_parent != parent_hwnd:
log.info(f"Re-embed: same HWND={self._mstsc_hwnd} lost parent "
f"(parent={actual_parent}, expected={parent_hwnd})")
self._connected = False
self._embed(parent_hwnd, width, height)
return self._connected
# Scenario 2: old HWND invalid, search for new mstsc window
WNDENUMPROC = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.wintypes.HWND, ctypes.wintypes.LPARAM)
hostname = self.server["ip"]
mstsc_pids = set(_find_mstsc_pids(self._process.pid)) if self._process else set()
found_windows = []
def enum_callback(hwnd, _lparam):
if not user32.IsWindowVisible(hwnd):
return True
class_buf = ctypes.create_unicode_buffer(256)
user32.GetClassNameW(hwnd, class_buf, 256)
class_name = class_buf.value
title_buf = ctypes.create_unicode_buffer(512)
user32.GetWindowTextW(hwnd, title_buf, 512)
title = title_buf.value
pid = ctypes.wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
win_pid = pid.value
is_mstsc_class = class_name in ("TscShellContainerClass", "RAIL_WINDOW", "IHWindowClass")
is_dialog = class_name == "#32770"
pid_match = win_pid in mstsc_pids
title_match = hostname in title
if (pid_match or title_match) and (is_mstsc_class or is_dialog):
if hwnd != self._mstsc_hwnd:
found_windows.append((hwnd, class_name, title, win_pid))
return True
user32.EnumWindows(WNDENUMPROC(enum_callback), 0)
if not found_windows:
return False
# Prefer TscShellContainerClass
target = None
for hwnd, cls, title, pid in found_windows:
if cls == "TscShellContainerClass":
target = hwnd
break
if not target:
target = found_windows[0][0]
log.info(f"Re-embed: found new mstsc window HWND={target}")
self._mstsc_hwnd = target
self._connected = False
self._embed(parent_hwnd, width, height)
return self._connected
def is_embedded(self) -> bool:
"""Check if the mstsc HWND is still a child of our parent frame.
Returns False if:
- HWND is invalid (window destroyed)
- HWND got reparented away from our frame (reconnect scenario)
- No HWND tracked
"""
if not self._mstsc_hwnd or not self._parent_hwnd:
return False
try:
import ctypes
user32 = ctypes.windll.user32
if not user32.IsWindow(self._mstsc_hwnd):
return False
# Check that it's still parented to our frame
actual_parent = user32.GetParent(self._mstsc_hwnd)
return actual_parent == self._parent_hwnd
except Exception:
return False
def is_alive(self) -> bool:
"""Check if mstsc process is still running (parent or children)."""
if not self._process:
return False
# Primary check: is the embedded window still valid?
# This is the most reliable indicator — works regardless of PID tracking
if self._mstsc_hwnd:
try:
import ctypes
if ctypes.windll.user32.IsWindow(self._mstsc_hwnd):
return True
except Exception:
pass
# Fallback: parent process still running
if self._process.poll() is None:
return True
return False
@property
def connected(self) -> bool:
return self._connected and self.is_alive()