""" Terminal tab — persistent interactive SSH shell via ShellSession + TerminalWidget. """ import queue import re import threading import time import customtkinter as ctk from core.ssh_client import ShellSession from core.i18n import t # Regex to strip ANSI escape sequences _ANSI_RE = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]|\x1b\].*?\x07|\x1b[()][AB012]|\x1b\[?\d*[a-zA-Z]') class TerminalTab(ctk.CTkFrame): def __init__(self, master, store, session_pool=None): super().__init__(master, fg_color="transparent") self.store = store self.session_pool = session_pool self._current_alias: str | None = None self._session: ShellSession | None = None self._reconnect_count = 0 self._max_reconnect = 5 self._intentional_disconnect = False # Import here to avoid circular issues from gui.widgets.terminal_widget import TerminalWidget self._terminal = TerminalWidget( self, send_callback=self._send_to_shell, resize_callback=self._on_resize, font_size=store.get_terminal_font_size(), on_font_size_changed=self._on_font_size_changed, ) self._terminal.pack(fill="both", expand=True, padx=5, pady=5) self._terminal.set_status(t("term_disconnected"), "#888888") # Thread-safe data queue self._data_queue: queue.Queue[bytes] = queue.Queue() # Sudo auto-password detection self._sudo_buffer = b"" # Buffer for detecting sudo prompts self._sudo_sent = False # Prevent sending password twice for same prompt def set_server(self, alias: str | None): if alias == self._current_alias: return # Store state of current session before switching if self._current_alias and self._session and self.session_pool: # Store terminal buffer from widget buf = self._terminal.get_current_buffer() self.session_pool.store_shell_state(self._current_alias, buf) # Disconnect current session self._disconnect() self._current_alias = alias if alias: self._connect() else: self._terminal.reset() self._terminal.set_status(t("term_disconnected"), "#888888") def _connect(self): if not self._current_alias: return server = self.store.get_server(self._current_alias) if not server: self._terminal.set_status( t("server_not_found").format(alias=self._current_alias), "#ff4444" ) return alias = self._current_alias self._terminal.set_status(t("term_connecting").format(alias=alias), "#ccaa00") self._intentional_disconnect = False def _do_connect(): try: key_path = self.store.get_ssh_key_path() # Use session pool if available if self.session_pool: cols, rows = self._terminal.get_size() session, is_new = self.session_pool.get_or_create_shell_session(alias, server, key_path) if is_new: # New session — reset terminal for clean start self.after(0, self._terminal.reset) session.cols = cols session.rows = rows session.connect() else: # Reused session — restore full screen state from pool saved_buf = self.session_pool.get_shell_state(alias) def _restore_buffer(): if saved_buf: self._terminal.restore_buffer(saved_buf) else: self._terminal.reset() self.after(0, _restore_buffer) # Set up callbacks even if session already existed session.on_data = self._on_data_received session.on_disconnect = self._on_disconnected self._session = session else: # Legacy behavior without session pool self.after(0, self._terminal.reset) cols, rows = self._terminal.get_size() session = ShellSession(server, key_path, cols=cols, rows=rows) session.on_data = self._on_data_received session.on_disconnect = self._on_disconnected session.connect() self._session = session # Set session on main thread to avoid races def _set_session(): self._reconnect_count = 0 self._terminal.set_status( t("term_connected").format(alias=alias), "#44cc44" ) self._terminal.focus_terminal() self.after(0, _set_session) except Exception as e: self.after(0, lambda: self._terminal.set_status( t("term_connect_failed").format(error=str(e)), "#ff4444" )) threading.Thread(target=_do_connect, daemon=True).start() def _disconnect(self): self._intentional_disconnect = True # Only disconnect if we don't have a session pool (otherwise session stays alive) if not self.session_pool and self._session: self._session.disconnect() self._session = None # If using session pool, session remains active in the pool elif self.session_pool and self._session: # Remove callbacks to prevent processing data after switch self._session.on_data = None self._session.on_disconnect = None self._session = None def _on_data_received(self, data: bytes): """Called from SSH thread — put data in thread-safe queue.""" self._data_queue.put(data) self.after(0, self._flush_data_queue) def _flush_data_queue(self): """Called on main thread — drain queue and feed terminal.""" chunks = [] try: while True: chunks.append(self._data_queue.get_nowait()) except queue.Empty: pass if chunks: combined = b"".join(chunks) self._terminal.feed(combined) # Sudo auto-password: only check if not already sent if not self._sudo_sent and self._session: self._sudo_buffer += combined self._sudo_buffer = self._sudo_buffer[-500:] buf_str = self._sudo_buffer.decode("utf-8", errors="replace") # Strip ANSI escapes and normalize line endings clean = _ANSI_RE.sub("", buf_str) clean = clean.replace("\r\n", "\n").replace("\r", "\n") last_line = clean.rstrip().rsplit("\n", 1)[-1].strip() if "[sudo] password for" in last_line: server = self.store.get_server(self._current_alias) if server and server.get("password"): pwd = server["password"] # Small delay — let the PTY settle before sending def _send_sudo_pass(p=pwd): if self._session and self._session.connected: self._session.send(p.encode("utf-8") + b"\n") self._terminal.set_status( "sudo: password sent automatically", "#44cc44" ) self.after(300, _send_sudo_pass) self._sudo_sent = True self._sudo_buffer = b"" else: self._terminal.set_status( "sudo: no password in server config", "#ff4444" ) def _on_disconnected(self): """Called from SSH read thread.""" def _handle(): if self._intentional_disconnect: self._terminal.set_status(t("term_disconnected"), "#888888") return # Remove dead session from pool so it gets recreated on next connect if self.session_pool and self._current_alias: self.session_pool.disconnect_session(self._current_alias) self._session = None if self._reconnect_count < self._max_reconnect: self._reconnect_count += 1 n = self._reconnect_count mx = self._max_reconnect self._terminal.set_status( t("term_reconnecting").format(n=n, max=mx), "#ccaa00" ) def _retry(): time.sleep(1) if not self._intentional_disconnect and self._current_alias: self.after(0, self._connect) threading.Thread(target=_retry, daemon=True).start() else: self._terminal.set_status(t("term_reconnect_fail"), "#ff4444") self.after(0, _handle) def _send_to_shell(self, data: bytes): session = self._session # local ref for thread safety if session and session.connected: session.send(data) # Reset sudo sent flag when user sends a new command (detects \r or \n) if b'\r' in data or b'\n' in data: self._sudo_sent = False elif self._current_alias and not self._intentional_disconnect and self._reconnect_count == 0: # Session dead, no reconnect in progress — trigger one attempt self._on_disconnected() def _on_resize(self, cols: int, rows: int): session = self._session # local ref for thread safety if session and session.connected: session.resize(cols, rows) def _on_font_size_changed(self, size: int): self.store.set_terminal_font_size(size)