""" Terminal tab — persistent interactive SSH shell via ShellSession + TerminalWidget. """ import threading import time import customtkinter as ctk from core.ssh_client import ShellSession from core.i18n import t import queue class TerminalTab(ctk.CTkFrame): def __init__(self, master, store): super().__init__(master, fg_color="transparent") self.store = store self._current_alias: str | None = None self._session: ShellSession | None = None self._reconnect_count = 0 self._max_reconnect = 3 self._intentional_disconnect = False # Import here to avoid circular issues from gui.widgets.terminal_widget import TerminalWidget self._terminal = TerminalWidget( self, send_callback=self._send_to_shell, resize_callback=self._on_resize, ) self._terminal.pack(fill="both", expand=True, padx=5, pady=5) self._terminal.set_status(t("term_disconnected"), "#888888") # Thread-safe data batching self._data_queue: queue.Queue[bytes] = queue.Queue() self._flush_after_id = None def set_server(self, alias: str | None): if alias == self._current_alias: return self._disconnect() self._current_alias = alias if alias: self._connect() else: self._terminal.reset() self._terminal.set_status(t("term_disconnected"), "#888888") def _connect(self): if not self._current_alias: return server = self.store.get_server(self._current_alias) if not server: self._terminal.set_status( t("server_not_found").format(alias=self._current_alias), "#ff4444" ) return alias = self._current_alias self._terminal.set_status(t("term_connecting").format(alias=alias), "#ccaa00") self._terminal.reset() self._intentional_disconnect = False def _do_connect(): try: key_path = self.store.get_ssh_key_path() cols, rows = self._terminal.get_size() session = ShellSession(server, key_path, cols=cols, rows=rows) session.on_data = self._on_data_received session.on_disconnect = self._on_disconnected session.connect() self._session = session self._reconnect_count = 0 self.after(0, lambda: self._terminal.set_status( t("term_connected").format(alias=alias), "#44cc44" )) self.after(0, self._terminal.focus_terminal) except Exception as e: self.after(0, lambda: self._terminal.set_status( t("term_connect_failed").format(error=str(e)), "#ff4444" )) threading.Thread(target=_do_connect, daemon=True).start() def _disconnect(self): self._intentional_disconnect = True if self._session: self._session.disconnect() self._session = None def _on_data_received(self, data: bytes): """Called from SSH thread — put data in thread-safe queue.""" self._data_queue.put(data) self.after(0, self._flush_data_queue) def _flush_data_queue(self): """Called on main thread — drain queue and feed terminal.""" chunks = [] try: while True: chunks.append(self._data_queue.get_nowait()) except queue.Empty: pass if chunks: self._terminal.feed(b"".join(chunks)) def _on_disconnected(self): if self._intentional_disconnect: self.after(0, lambda: self._terminal.set_status( t("term_disconnected"), "#888888" )) return self._session = None if self._reconnect_count < self._max_reconnect: self._reconnect_count += 1 n = self._reconnect_count mx = self._max_reconnect self.after(0, lambda: self._terminal.set_status( t("term_reconnecting").format(n=n, max=mx), "#ccaa00" )) def _retry(): time.sleep(1) if not self._intentional_disconnect and self._current_alias: self.after(0, self._connect) threading.Thread(target=_retry, daemon=True).start() else: self.after(0, lambda: self._terminal.set_status( t("term_reconnect_fail"), "#ff4444" )) def _send_to_shell(self, data: bytes): if self._session and self._session.connected: self._session.send(data) def _on_resize(self, cols: int, rows: int): if self._session and self._session.connected: self._session.resize(cols, rows)