Files
server-manager/gui/tabs/terminal_tab.py
chrome-storm-c442 11ea811c4f v1.8.7: robust sudo auto-password + debug status
- Strip ANSI escapes before sudo prompt detection
- Normalize \r\n/\r line endings for reliable matching
- 300ms delay before sending password (PTY settle time)
- Status bar feedback: "sudo: password sent automatically"
- Encode password explicitly as UTF-8

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 04:14:55 -05:00

243 lines
9.9 KiB
Python

"""
Terminal tab — persistent interactive SSH shell via ShellSession + TerminalWidget.
"""
import queue
import re
import threading
import time
import customtkinter as ctk
from core.ssh_client import ShellSession
from core.i18n import t
# Regex to strip ANSI escape sequences
_ANSI_RE = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]|\x1b\].*?\x07|\x1b[()][AB012]|\x1b\[?\d*[a-zA-Z]')
class TerminalTab(ctk.CTkFrame):
def __init__(self, master, store, session_pool=None):
super().__init__(master, fg_color="transparent")
self.store = store
self.session_pool = session_pool
self._current_alias: str | None = None
self._session: ShellSession | None = None
self._reconnect_count = 0
self._max_reconnect = 5
self._intentional_disconnect = False
# Import here to avoid circular issues
from gui.widgets.terminal_widget import TerminalWidget
self._terminal = TerminalWidget(
self,
send_callback=self._send_to_shell,
resize_callback=self._on_resize,
font_size=store.get_terminal_font_size(),
on_font_size_changed=self._on_font_size_changed,
)
self._terminal.pack(fill="both", expand=True, padx=5, pady=5)
self._terminal.set_status(t("term_disconnected"), "#888888")
# Thread-safe data queue
self._data_queue: queue.Queue[bytes] = queue.Queue()
# Sudo auto-password detection
self._sudo_buffer = b"" # Buffer for detecting sudo prompts
self._sudo_sent = False # Prevent sending password twice for same prompt
def set_server(self, alias: str | None):
if alias == self._current_alias:
return
# Store state of current session before switching
if self._current_alias and self._session and self.session_pool:
# Store terminal buffer from widget
buf = self._terminal.get_current_buffer()
self.session_pool.store_shell_state(self._current_alias, buf)
# Disconnect current session
self._disconnect()
self._current_alias = alias
if alias:
self._connect()
else:
self._terminal.reset()
self._terminal.set_status(t("term_disconnected"), "#888888")
def _connect(self):
if not self._current_alias:
return
server = self.store.get_server(self._current_alias)
if not server:
self._terminal.set_status(
t("server_not_found").format(alias=self._current_alias), "#ff4444"
)
return
alias = self._current_alias
self._terminal.set_status(t("term_connecting").format(alias=alias), "#ccaa00")
self._intentional_disconnect = False
def _do_connect():
try:
key_path = self.store.get_ssh_key_path()
# Use session pool if available
if self.session_pool:
cols, rows = self._terminal.get_size()
session, is_new = self.session_pool.get_or_create_shell_session(alias, server, key_path)
if is_new:
# New session — reset terminal for clean start
self.after(0, self._terminal.reset)
session.cols = cols
session.rows = rows
session.connect()
else:
# Reused session — restore full screen state from pool
saved_buf = self.session_pool.get_shell_state(alias)
def _restore_buffer():
if saved_buf:
self._terminal.restore_buffer(saved_buf)
else:
self._terminal.reset()
self.after(0, _restore_buffer)
# Set up callbacks even if session already existed
session.on_data = self._on_data_received
session.on_disconnect = self._on_disconnected
self._session = session
else:
# Legacy behavior without session pool
self.after(0, self._terminal.reset)
cols, rows = self._terminal.get_size()
session = ShellSession(server, key_path, cols=cols, rows=rows)
session.on_data = self._on_data_received
session.on_disconnect = self._on_disconnected
session.connect()
self._session = session
# Set session on main thread to avoid races
def _set_session():
self._reconnect_count = 0
self._terminal.set_status(
t("term_connected").format(alias=alias), "#44cc44"
)
self._terminal.focus_terminal()
self.after(0, _set_session)
except Exception as e:
self.after(0, lambda: self._terminal.set_status(
t("term_connect_failed").format(error=str(e)), "#ff4444"
))
threading.Thread(target=_do_connect, daemon=True).start()
def _disconnect(self):
self._intentional_disconnect = True
# Only disconnect if we don't have a session pool (otherwise session stays alive)
if not self.session_pool and self._session:
self._session.disconnect()
self._session = None
# If using session pool, session remains active in the pool
elif self.session_pool and self._session:
# Remove callbacks to prevent processing data after switch
self._session.on_data = None
self._session.on_disconnect = None
self._session = None
def _on_data_received(self, data: bytes):
"""Called from SSH thread — put data in thread-safe queue."""
self._data_queue.put(data)
self.after(0, self._flush_data_queue)
def _flush_data_queue(self):
"""Called on main thread — drain queue and feed terminal."""
chunks = []
try:
while True:
chunks.append(self._data_queue.get_nowait())
except queue.Empty:
pass
if chunks:
combined = b"".join(chunks)
self._terminal.feed(combined)
# Sudo auto-password: only check if not already sent
if not self._sudo_sent and self._session:
self._sudo_buffer += combined
self._sudo_buffer = self._sudo_buffer[-500:]
buf_str = self._sudo_buffer.decode("utf-8", errors="replace")
# Strip ANSI escapes and normalize line endings
clean = _ANSI_RE.sub("", buf_str)
clean = clean.replace("\r\n", "\n").replace("\r", "\n")
last_line = clean.rstrip().rsplit("\n", 1)[-1].strip()
if "[sudo] password for" in last_line:
server = self.store.get_server(self._current_alias)
if server and server.get("password"):
pwd = server["password"]
# Small delay — let the PTY settle before sending
def _send_sudo_pass(p=pwd):
if self._session and self._session.connected:
self._session.send(p.encode("utf-8") + b"\n")
self._terminal.set_status(
"sudo: password sent automatically", "#44cc44"
)
self.after(300, _send_sudo_pass)
self._sudo_sent = True
self._sudo_buffer = b""
else:
self._terminal.set_status(
"sudo: no password in server config", "#ff4444"
)
def _on_disconnected(self):
"""Called from SSH read thread."""
def _handle():
if self._intentional_disconnect:
self._terminal.set_status(t("term_disconnected"), "#888888")
return
# Remove dead session from pool so it gets recreated on next connect
if self.session_pool and self._current_alias:
self.session_pool.disconnect_session(self._current_alias)
self._session = None
if self._reconnect_count < self._max_reconnect:
self._reconnect_count += 1
n = self._reconnect_count
mx = self._max_reconnect
self._terminal.set_status(
t("term_reconnecting").format(n=n, max=mx), "#ccaa00"
)
def _retry():
time.sleep(1)
if not self._intentional_disconnect and self._current_alias:
self.after(0, self._connect)
threading.Thread(target=_retry, daemon=True).start()
else:
self._terminal.set_status(t("term_reconnect_fail"), "#ff4444")
self.after(0, _handle)
def _send_to_shell(self, data: bytes):
session = self._session # local ref for thread safety
if session and session.connected:
session.send(data)
# Reset sudo sent flag when user sends a new command (detects \r or \n)
if b'\r' in data or b'\n' in data:
self._sudo_sent = False
elif self._current_alias and not self._intentional_disconnect and self._reconnect_count == 0:
# Session dead, no reconnect in progress — trigger one attempt
self._on_disconnected()
def _on_resize(self, cols: int, rows: int):
session = self._session # local ref for thread safety
if session and session.connected:
session.resize(cols, rows)
def _on_font_size_changed(self, size: int):
self.store.set_terminal_font_size(size)