""" Auto-updater — checks Gitea releases, downloads and applies updates. """ import base64 import hashlib import json import os import platform import re import subprocess import sys import tempfile import threading import time import urllib.error import urllib.request from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from core.server_store import ServerStore from core.logger import log from version import __version__ # Check interval: 1 hour _CHECK_INTERVAL = 3600 # Hide console windows on Windows for subprocess calls _SUBPROCESS_FLAGS = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def _platform_tag() -> str: """Get platform tag matching build.py naming: win-x64, linux-x64, etc.""" s = platform.system().lower() m = platform.machine().lower() arch_map = { "x86_64": "x64", "amd64": "x64", "x86": "x32", "i686": "x32", "i386": "x32", "aarch64": "arm64", "arm64": "arm64", "armv7l": "arm", } arch = arch_map.get(m, m) os_map = {"windows": "win", "linux": "linux", "darwin": "mac"} os_tag = os_map.get(s, s) return f"{os_tag}-{arch}" def _parse_version(ver: str) -> tuple[int, int, int]: """Parse semver string into (major, minor, patch).""" m = re.match(r"v?(\d+)\.(\d+)\.(\d+)", ver) if not m: return (0, 0, 0) return int(m.group(1)), int(m.group(2)), int(m.group(3)) def _version_newer(remote: str, local: str) -> bool: """Return True if remote version is newer than local.""" return _parse_version(remote) > _parse_version(local) _GITEA_API_BASE = "https://git.sensey24.ru/api/v1/repos/aibot777/server-manager" def _get_auth_headers() -> dict: """Try to get Gitea auth headers from git remote. Returns empty dict on failure.""" try: result = subprocess.run( ["git", "remote", "get-url", "sensey"], capture_output=True, text=True, cwd=PROJECT_DIR, creationflags=_SUBPROCESS_FLAGS, ) remote_url = result.stdout.strip() m = re.match(r"https://([^:]+):([^@]+)@", remote_url) if m: user, password = m.groups() return { "Authorization": "Basic " + base64.b64encode( f"{user}:{password}".encode() ).decode() } except Exception: pass return {} def _gitea_api(endpoint: str) -> Optional[dict]: """Call Gitea API. Tries git remote auth first, falls back to public API.""" url = f"{_GITEA_API_BASE}/{endpoint}" req = urllib.request.Request(url, headers=_get_auth_headers(), method="GET") try: with urllib.request.urlopen(req, timeout=15) as resp: return json.loads(resp.read().decode()) except Exception: return None class UpdateChecker: """Background thread that checks for new releases on Gitea.""" def __init__(self, store: "ServerStore", gui_callback=None): self.store = store self._gui_callback = gui_callback self._running = False self._thread: Optional[threading.Thread] = None self._latest: Optional[dict] = None # cached latest release info self._download_path: Optional[str] = None def start(self): if self._running: return self._running = True self._thread = threading.Thread(target=self._loop, daemon=True) self._thread.start() def stop(self): self._running = False if self._thread: self._thread.join(timeout=3.0) @property def latest(self) -> Optional[dict]: """Latest release info: {version, url, changelog, filename, sha256}.""" return self._latest @property def download_path(self) -> Optional[str]: return self._download_path def check_now(self) -> Optional[dict]: """Manual check — returns release info or None.""" info = self._fetch_latest_release() if info: self._latest = info # Update timestamp self.store.set_last_update_check() return info def _fetch_latest_release(self) -> Optional[dict]: """Fetch latest release from Gitea API.""" try: data = _gitea_api("releases?limit=1") except Exception: return None if not data or not isinstance(data, list) or len(data) == 0: return None release = data[0] tag = release.get("tag_name", "") version = tag.lstrip("v") if not _version_newer(version, __version__): return None # Skip if user chose to skip this version skip = self.store.get_skip_version() if skip and version == skip: return None # Find platform-specific asset assets = release.get("assets", []) asset = self._get_platform_asset(assets) if not asset: return None return { "version": version, "tag": tag, "url": asset["browser_download_url"], "filename": asset["name"], "size": asset.get("size", 0), "changelog": release.get("body", ""), "sha256": self._extract_sha256(release.get("body", ""), asset["name"]), } def _get_platform_asset(self, assets: list[dict]) -> Optional[dict]: """Find the asset matching current platform.""" tag = _platform_tag() for asset in assets: name = asset.get("name", "") if tag in name: return asset return None def _extract_sha256(self, body: str, filename: str) -> Optional[str]: """Try to extract SHA256 hash from release body.""" # Common formats: `sha256: abc123...` or `abc123... filename` for line in body.splitlines(): if filename in line: m = re.search(r"[a-f0-9]{64}", line, re.IGNORECASE) if m: return m.group(0) # Also check for generic sha256 line m = re.search(r"sha256[:\s]+([a-f0-9]{64})", body, re.IGNORECASE) if m: return m.group(1) return None def download_update(self, url: str, progress_cb=None) -> Optional[str]: """Download update binary to temp dir. Returns path or None.""" try: headers = _get_auth_headers() req = urllib.request.Request(url, headers=headers, method="GET") resp = urllib.request.urlopen(req, timeout=120) total = int(resp.headers.get("Content-Length", 0)) downloaded = 0 # Save to temp file suffix = ".exe" if sys.platform == "win32" else "" fd, tmp_path = tempfile.mkstemp(suffix=suffix, prefix="sm_update_") with os.fdopen(fd, "wb") as f: while True: chunk = resp.read(65536) if not chunk: break f.write(chunk) downloaded += len(chunk) if progress_cb and total > 0: progress_cb(downloaded, total) resp.close() # Verify SHA256 if available if self._latest and self._latest.get("sha256"): expected = self._latest["sha256"].lower() sha = hashlib.sha256() with open(tmp_path, "rb") as f: while True: chunk = f.read(65536) if not chunk: break sha.update(chunk) actual = sha.hexdigest() if actual != expected: log.error(f"SHA256 mismatch: expected {expected}, got {actual}") os.remove(tmp_path) return None self._download_path = tmp_path return tmp_path except Exception as e: log.error(f"Download failed: {e}") return None def apply_update(self, binary_path: str): """Replace current exe and restart.""" if not getattr(sys, "frozen", False): log.info("Not a frozen exe, skipping apply") return False current_exe = sys.executable if sys.platform == "win32": return self._apply_windows(binary_path, current_exe) else: return self._apply_linux(binary_path, current_exe) def _apply_windows(self, new_exe: str, current_exe: str) -> bool: """Windows update: create hidden .vbs that runs .bat silently.""" try: tmp_dir = tempfile.gettempdir() bat_path = os.path.join(tmp_dir, "sm_update.bat") vbs_path = os.path.join(tmp_dir, "sm_update.vbs") pid = os.getpid() bat_content = f"""@echo off :wait tasklist /fi "PID eq {pid}" 2>NUL | find "{pid}" >NUL if %ERRORLEVEL% == 0 ( timeout /t 1 /nobreak >NUL goto wait ) copy /Y "{new_exe}" "{current_exe}" >NUL if %ERRORLEVEL% NEQ 0 ( exit /b 1 ) start "" "{current_exe}" del "{bat_path}" del "%~f0" """ with open(bat_path, "w") as f: f.write(bat_content) # VBS wrapper runs .bat completely hidden (no CMD flash) vbs_content = f'CreateObject("Wscript.Shell").Run """{bat_path}""", 0, False\n' with open(vbs_path, "w") as f: f.write(vbs_content) # Launch VBS detached — no visible window at all subprocess.Popen( ["wscript", vbs_path], creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP, ) return True except Exception as e: log.error(f"Windows update failed: {e}") return False def _apply_linux(self, new_exe: str, current_exe: str) -> bool: """Linux update: replace in-place and exec.""" try: os.replace(new_exe, current_exe) os.chmod(current_exe, 0o755) os.execv(current_exe, sys.argv) return True # won't reach here except Exception as e: log.error(f"Linux update failed: {e}") return False def _loop(self): """Background loop — check periodically.""" # Initial delay: 10 seconds after startup for _ in range(100): if not self._running: return time.sleep(0.1) while self._running: # Check if enough time passed since last check last_check = self.store.get_last_update_check() now = time.time() if not last_check or (now - last_check) >= _CHECK_INTERVAL: info = self.check_now() if info and self._gui_callback: mode = self.store.get_update_mode() if mode == "full-auto": # Auto-download and apply path = self.download_update(info["url"]) if path: try: self._gui_callback("auto_apply", info, path) except Exception: pass elif mode == "auto-download": # Download in background, then notify path = self.download_update(info["url"]) if path: try: self._gui_callback("downloaded", info, path) except Exception: pass else: try: self._gui_callback("available", info, None) except Exception: pass else: # notify-only try: self._gui_callback("available", info, None) except Exception: pass # Sleep for 5 minutes, checking _running flag for _ in range(3000): if not self._running: return time.sleep(0.1)