""" Terminal tab — persistent interactive SSH shell via ShellSession + TerminalWidget. """ import queue import threading import time import customtkinter as ctk from core.ssh_client import ShellSession from core.i18n import t class TerminalTab(ctk.CTkFrame): def __init__(self, master, store): super().__init__(master, fg_color="transparent") self.store = store self._current_alias: str | None = None self._session: ShellSession | None = None self._reconnect_count = 0 self._max_reconnect = 3 self._intentional_disconnect = False # Import here to avoid circular issues from gui.widgets.terminal_widget import TerminalWidget self._terminal = TerminalWidget( self, send_callback=self._send_to_shell, resize_callback=self._on_resize, ) self._terminal.pack(fill="both", expand=True, padx=5, pady=5) self._terminal.set_status(t("term_disconnected"), "#888888") # Thread-safe data queue self._data_queue: queue.Queue[bytes] = queue.Queue() def set_server(self, alias: str | None): if alias == self._current_alias: return self._disconnect() self._current_alias = alias if alias: self._connect() else: self._terminal.reset() self._terminal.set_status(t("term_disconnected"), "#888888") def _connect(self): if not self._current_alias: return server = self.store.get_server(self._current_alias) if not server: self._terminal.set_status( t("server_not_found").format(alias=self._current_alias), "#ff4444" ) return alias = self._current_alias self._terminal.set_status(t("term_connecting").format(alias=alias), "#ccaa00") self._terminal.reset() self._intentional_disconnect = False def _do_connect(): try: key_path = self.store.get_ssh_key_path() cols, rows = self._terminal.get_size() session = ShellSession(server, key_path, cols=cols, rows=rows) session.on_data = self._on_data_received session.on_disconnect = self._on_disconnected session.connect() # Set session on main thread to avoid races def _set_session(): self._session = session self._reconnect_count = 0 self._terminal.set_status( t("term_connected").format(alias=alias), "#44cc44" ) self._terminal.focus_terminal() self.after(0, _set_session) except Exception as e: self.after(0, lambda: self._terminal.set_status( t("term_connect_failed").format(error=str(e)), "#ff4444" )) threading.Thread(target=_do_connect, daemon=True).start() def _disconnect(self): self._intentional_disconnect = True session = self._session self._session = None if session: session.disconnect() def _on_data_received(self, data: bytes): """Called from SSH thread — put data in thread-safe queue.""" self._data_queue.put(data) self.after(0, self._flush_data_queue) def _flush_data_queue(self): """Called on main thread — drain queue and feed terminal.""" chunks = [] try: while True: chunks.append(self._data_queue.get_nowait()) except queue.Empty: pass if chunks: self._terminal.feed(b"".join(chunks)) def _on_disconnected(self): """Called from SSH read thread.""" def _handle(): if self._intentional_disconnect: self._terminal.set_status(t("term_disconnected"), "#888888") return self._session = None if self._reconnect_count < self._max_reconnect: self._reconnect_count += 1 n = self._reconnect_count mx = self._max_reconnect self._terminal.set_status( t("term_reconnecting").format(n=n, max=mx), "#ccaa00" ) def _retry(): time.sleep(1) if not self._intentional_disconnect and self._current_alias: self.after(0, self._connect) threading.Thread(target=_retry, daemon=True).start() else: self._terminal.set_status(t("term_reconnect_fail"), "#ff4444") self.after(0, _handle) def _send_to_shell(self, data: bytes): session = self._session # local ref for thread safety if session and session.connected: session.send(data) def _on_resize(self, cols: int, rows: int): session = self._session # local ref for thread safety if session and session.connected: session.resize(cols, rows)