""" Auto-updater — checks Gitea releases, downloads and applies updates. """ import base64 import hashlib import json import os import platform import re import subprocess import sys import tempfile import threading import time import urllib.error import urllib.request from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from core.server_store import ServerStore from core.logger import log from version import __version__ # Check interval: 1 hour _CHECK_INTERVAL = 3600 # Hide console windows on Windows for subprocess calls _SUBPROCESS_FLAGS = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def _platform_tag() -> str: """Get platform tag matching build.py naming: win-x64, linux-x64, etc.""" s = platform.system().lower() m = platform.machine().lower() arch_map = { "x86_64": "x64", "amd64": "x64", "x86": "x32", "i686": "x32", "i386": "x32", "aarch64": "arm64", "arm64": "arm64", "armv7l": "arm", } arch = arch_map.get(m, m) os_map = {"windows": "win", "linux": "linux", "darwin": "mac"} os_tag = os_map.get(s, s) return f"{os_tag}-{arch}" def _parse_version(ver: str) -> tuple[int, int, int]: """Parse semver string into (major, minor, patch).""" m = re.match(r"v?(\d+)\.(\d+)\.(\d+)", ver) if not m: return (0, 0, 0) return int(m.group(1)), int(m.group(2)), int(m.group(3)) def _version_newer(remote: str, local: str) -> bool: """Return True if remote version is newer than local.""" return _parse_version(remote) > _parse_version(local) _GITEA_API_BASE = "https://git.sensey24.ru/api/v1/repos/aibot777/server-manager" def _get_auth_headers() -> dict: """Try to get Gitea auth headers from git remote. Returns empty dict on failure.""" try: result = subprocess.run( ["git", "remote", "get-url", "sensey"], capture_output=True, text=True, cwd=PROJECT_DIR, creationflags=_SUBPROCESS_FLAGS, ) remote_url = result.stdout.strip() m = re.match(r"https://([^:]+):([^@]+)@", remote_url) if m: user, password = m.groups() return { "Authorization": "Basic " + base64.b64encode( f"{user}:{password}".encode() ).decode() } except Exception: pass return {} def _gitea_api(endpoint: str) -> Optional[dict]: """Call Gitea API. Tries git remote auth first, falls back to public API.""" url = f"{_GITEA_API_BASE}/{endpoint}" req = urllib.request.Request(url, headers=_get_auth_headers(), method="GET") try: with urllib.request.urlopen(req, timeout=15) as resp: return json.loads(resp.read().decode()) except Exception: return None class UpdateChecker: """Background thread that checks for new releases on Gitea.""" def __init__(self, store: "ServerStore", gui_callback=None): self.store = store self._gui_callback = gui_callback self._running = False self._thread: Optional[threading.Thread] = None self._latest: Optional[dict] = None # cached latest release info self._download_path: Optional[str] = None def start(self): if self._running: return self._running = True self._thread = threading.Thread(target=self._loop, daemon=True) self._thread.start() def stop(self): self._running = False if self._thread: self._thread.join(timeout=3.0) @property def latest(self) -> Optional[dict]: """Latest release info: {version, url, changelog, filename, sha256}.""" return self._latest @property def download_path(self) -> Optional[str]: return self._download_path def check_now(self) -> Optional[dict]: """Manual check — returns release info or None.""" info = self._fetch_latest_release() if info: self._latest = info # Update timestamp self.store.set_last_update_check() return info def _fetch_latest_release(self) -> Optional[dict]: """Fetch latest release from Gitea API.""" try: data = _gitea_api("releases?limit=1") except Exception: return None if not data or not isinstance(data, list) or len(data) == 0: return None release = data[0] tag = release.get("tag_name", "") version = tag.lstrip("v") if not _version_newer(version, __version__): return None # Skip if user chose to skip this version skip = self.store.get_skip_version() if skip and version == skip: return None # Find platform-specific asset assets = release.get("assets", []) asset = self._get_platform_asset(assets) if not asset: return None return { "version": version, "tag": tag, "url": asset["browser_download_url"], "filename": asset["name"], "size": asset.get("size", 0), "changelog": release.get("body", ""), "sha256": self._extract_sha256(release.get("body", ""), asset["name"]), } def _get_platform_asset(self, assets: list[dict]) -> Optional[dict]: """Find the asset matching current platform.""" tag = _platform_tag() for asset in assets: name = asset.get("name", "") if tag in name: return asset return None def _extract_sha256(self, body: str, filename: str) -> Optional[str]: """Try to extract SHA256 hash from release body.""" # Common formats: `sha256: abc123...` or `abc123... filename` for line in body.splitlines(): if filename in line: m = re.search(r"[a-f0-9]{64}", line, re.IGNORECASE) if m: return m.group(0) # Also check for generic sha256 line m = re.search(r"sha256[:\s]+([a-f0-9]{64})", body, re.IGNORECASE) if m: return m.group(1) return None def download_update(self, url: str, progress_cb=None) -> Optional[str]: """Download update binary to temp dir. Returns path or None.""" try: headers = _get_auth_headers() req = urllib.request.Request(url, headers=headers, method="GET") resp = urllib.request.urlopen(req, timeout=120) total = int(resp.headers.get("Content-Length", 0)) downloaded = 0 # Save to temp file suffix = ".exe" if sys.platform == "win32" else "" fd, tmp_path = tempfile.mkstemp(suffix=suffix, prefix="sm_update_") with os.fdopen(fd, "wb") as f: while True: chunk = resp.read(65536) if not chunk: break f.write(chunk) downloaded += len(chunk) if progress_cb and total > 0: progress_cb(downloaded, total) resp.close() # Verify SHA256 if available if self._latest and self._latest.get("sha256"): expected = self._latest["sha256"].lower() sha = hashlib.sha256() with open(tmp_path, "rb") as f: while True: chunk = f.read(65536) if not chunk: break sha.update(chunk) actual = sha.hexdigest() if actual != expected: log.error(f"SHA256 mismatch: expected {expected}, got {actual}") os.remove(tmp_path) return None self._download_path = tmp_path return tmp_path except Exception as e: log.error(f"Download failed: {e}") return None def apply_update(self, binary_path: str): """Replace current exe and restart.""" if not getattr(sys, "frozen", False): log.info("Not a frozen exe, skipping apply") return False current_exe = sys.executable if sys.platform == "win32": return self._apply_windows(binary_path, current_exe) else: return self._apply_linux(binary_path, current_exe) def _apply_windows(self, new_exe: str, current_exe: str) -> bool: """Windows update: BAT script waits for exit, copies, launches.""" try: tmp_dir = tempfile.gettempdir() bat_path = os.path.join(tmp_dir, "sm_update.bat") log_path = os.path.join(tmp_dir, "sm_update.log") pid = os.getpid() new_size = os.path.getsize(new_exe) bat_content = f"""@echo off setlocal enabledelayedexpansion chcp 65001 >nul 2>&1 set "LOGFILE={log_path}" set "SRC={new_exe}" set "DST={current_exe}" set "PID={pid}" set "TMPDIR={tmp_dir}" set "EXPECTED_SIZE={new_size}" echo [%date% %time%] Update script started >> "%LOGFILE%" echo [%date% %time%] PID to wait for: %PID% >> "%LOGFILE%" echo [%date% %time%] SRC: %SRC% >> "%LOGFILE%" echo [%date% %time%] DST: %DST% >> "%LOGFILE%" echo [%date% %time%] Expected size: %EXPECTED_SIZE% >> "%LOGFILE%" :wait_loop tasklist /FI "PID eq %PID%" 2>nul | find "%PID%" >nul if %errorlevel%==0 ( echo [%date% %time%] Waiting for PID %PID% to exit... >> "%LOGFILE%" timeout /t 1 /nobreak >nul goto wait_loop ) echo [%date% %time%] Process exited, waiting 3s... >> "%LOGFILE%" timeout /t 3 /nobreak >nul rem Clean stale _MEI directories echo [%date% %time%] Cleaning _MEI directories... >> "%LOGFILE%" for /d %%D in ("%TMPDIR%\\_MEI*") do ( rmdir /s /q "%%D" >nul 2>&1 ) timeout /t 1 /nobreak >nul rem Log source file size for %%F in ("%SRC%") do ( echo [%date% %time%] SRC file size: %%~zF >> "%LOGFILE%" ) rem Delete old DST first so copy is clean echo [%date% %time%] Deleting old DST... >> "%LOGFILE%" del /f /q "%DST%" >nul 2>&1 if exist "%DST%" ( echo [%date% %time%] WARNING: could not delete old DST >> "%LOGFILE%" ) timeout /t 1 /nobreak >nul rem Copy with retry and size verification echo [%date% %time%] Starting copy... >> "%LOGFILE%" set COPIED=0 for /L %%i in (1,1,5) do ( if !COPIED!==0 ( echo [%date% %time%] Copy attempt %%i... >> "%LOGFILE%" copy /Y /B "%SRC%" "%DST%" >> "%LOGFILE%" 2>&1 if exist "%DST%" ( for %%F in ("%DST%") do set DST_SIZE=%%~zF echo [%date% %time%] DST size after copy: !DST_SIZE! >> "%LOGFILE%" if "!DST_SIZE!"=="%EXPECTED_SIZE%" ( echo [%date% %time%] Size verified OK >> "%LOGFILE%" set COPIED=1 ) else ( echo [%date% %time%] Size mismatch! Expected %EXPECTED_SIZE%, got !DST_SIZE! >> "%LOGFILE%" del /f /q "%DST%" >nul 2>&1 timeout /t 2 /nobreak >nul ) ) else ( echo [%date% %time%] Copy failed - DST does not exist >> "%LOGFILE%" timeout /t 2 /nobreak >nul ) ) ) if %COPIED%==0 ( echo [%date% %time%] FAILED: could not copy after 5 attempts >> "%LOGFILE%" pause goto cleanup ) echo [%date% %time%] Copy successful, launching... >> "%LOGFILE%" timeout /t 2 /nobreak >nul rem Launch new exe echo [%date% %time%] Starting: %DST% >> "%LOGFILE%" start "" "%DST%" echo [%date% %time%] Launch command issued >> "%LOGFILE%" timeout /t 2 /nobreak >nul :cleanup rem Delete downloaded update file del /f /q "%SRC%" >nul 2>&1 echo [%date% %time%] Cleanup done >> "%LOGFILE%" rem Self-delete del /f /q "%~f0" >nul 2>&1 """ with open(bat_path, "w", encoding="utf-8") as f: f.write(bat_content) log.info(f"Update BAT: {bat_path}, log: {log_path}") # Launch BAT minimized subprocess.Popen( ["cmd.exe", "/c", "start", "/min", "", bat_path], creationflags=_SUBPROCESS_FLAGS, ) return True except Exception as e: log.error(f"Windows update failed: {e}") return False def _apply_linux(self, new_exe: str, current_exe: str) -> bool: """Linux update: replace in-place and exec.""" try: os.replace(new_exe, current_exe) os.chmod(current_exe, 0o755) os.execv(current_exe, sys.argv) return True # won't reach here except Exception as e: log.error(f"Linux update failed: {e}") return False def _loop(self): """Background loop — check periodically.""" # Initial delay: 10 seconds after startup for _ in range(100): if not self._running: return time.sleep(0.1) while self._running: # Check if enough time passed since last check last_check = self.store.get_last_update_check() now = time.time() if not last_check or (now - last_check) >= _CHECK_INTERVAL: info = self.check_now() if info and self._gui_callback: mode = self.store.get_update_mode() if mode == "full-auto": # Auto-download and apply path = self.download_update(info["url"]) if path: try: self._gui_callback("auto_apply", info, path) except Exception: pass elif mode == "auto-download": # Download in background, then notify path = self.download_update(info["url"]) if path: try: self._gui_callback("downloaded", info, path) except Exception: pass else: try: self._gui_callback("available", info, None) except Exception: pass else: # notify-only try: self._gui_callback("available", info, None) except Exception: pass # Sleep for 5 minutes, checking _running flag for _ in range(3000): if not self._running: return time.sleep(0.1)