v1.8.78: auto-updater — Gitea releases check, download, apply

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
chrome-storm-c442
2026-03-01 09:00:27 -05:00
parent c23eb36dcc
commit 9393134593
8 changed files with 974 additions and 1 deletions

View File

@@ -453,6 +453,30 @@ _EN = {
"ctx_check_status": "Check Status",
"ctx_copy_alias": "Copy Alias",
"ctx_alias_copied": "Alias copied",
# Updates
"update_available": "ServerManager v{version} is available",
"update_available_title": "Update Available",
"update_check": "Check for Updates",
"update_checking": "Checking for updates...",
"update_download_install": "Download & Install",
"update_downloading": "Downloading...",
"update_downloaded": "Downloaded",
"update_install": "Update",
"update_restart": "Restart to Update",
"update_skip": "Skip",
"update_later": "Later",
"update_error": "Update failed",
"update_changelog": "What's New",
"update_no_changelog": "No changelog available.",
"update_current": "Current",
"update_ready": "Ready to install",
"update_mode": "Update Mode",
"update_mode_notify": "Notify Only",
"update_mode_download": "Auto-Download",
"update_mode_auto": "Full Auto",
"update_no_updates": "You're up to date!",
"update_not_frozen": "Updates only work in packaged (exe) mode",
}
_RU = {
@@ -883,6 +907,30 @@ _RU = {
"ctx_check_status": "Проверить статус",
"ctx_copy_alias": "Копировать алиас",
"ctx_alias_copied": "Алиас скопирован",
# Updates
"update_available": "Доступен ServerManager v{version}",
"update_available_title": "Доступно обновление",
"update_check": "Проверить обновления",
"update_checking": "Проверка обновлений...",
"update_download_install": "Скачать и установить",
"update_downloading": "Скачивание...",
"update_downloaded": "Скачано",
"update_install": "Обновить",
"update_restart": "Перезапустить",
"update_skip": "Пропустить",
"update_later": "Позже",
"update_error": "Ошибка обновления",
"update_changelog": "Что нового",
"update_no_changelog": "Описание изменений отсутствует.",
"update_current": "Текущая",
"update_ready": "Готово к установке",
"update_mode": "Режим обновлений",
"update_mode_notify": "Только уведомлять",
"update_mode_download": "Авто-скачивание",
"update_mode_auto": "Полный авто",
"update_no_updates": "У вас последняя версия!",
"update_not_frozen": "Обновления работают только в exe-режиме",
}
_ZH = {
@@ -1313,6 +1361,30 @@ _ZH = {
"ctx_check_status": "检查状态",
"ctx_copy_alias": "复制别名",
"ctx_alias_copied": "别名已复制",
# Updates
"update_available": "ServerManager v{version} 可用",
"update_available_title": "有可用更新",
"update_check": "检查更新",
"update_checking": "正在检查更新...",
"update_download_install": "下载并安装",
"update_downloading": "下载中...",
"update_downloaded": "已下载",
"update_install": "更新",
"update_restart": "重启更新",
"update_skip": "跳过",
"update_later": "稍后",
"update_error": "更新失败",
"update_changelog": "更新内容",
"update_no_changelog": "无更新日志。",
"update_current": "当前版本",
"update_ready": "准备安装",
"update_mode": "更新模式",
"update_mode_notify": "仅通知",
"update_mode_download": "自动下载",
"update_mode_auto": "全自动",
"update_no_updates": "已是最新版本!",
"update_not_frozen": "更新仅在打包(exe)模式下有效",
}
_TRANSLATIONS = {

View File

@@ -59,6 +59,10 @@ class ServerStore:
self._terminal_font_size: int = 11
self._window_geometry: str = ""
self._servers_file: str = DEFAULT_SERVERS_FILE
# Update settings
self._update_mode: str = "auto-download" # "notify-only" | "auto-download" | "full-auto"
self._last_update_check: float = 0
self._skip_version: str = ""
self._load_settings()
self._load()
@@ -79,6 +83,9 @@ class ServerStore:
self._check_interval = settings.get("check_interval", 60)
self._terminal_font_size = settings.get("terminal_font_size", 11)
self._window_geometry = settings.get("window_geometry", "")
self._update_mode = settings.get("update_mode", "auto-download")
self._last_update_check = settings.get("last_update_check", 0)
self._skip_version = settings.get("skip_version", "")
except json.JSONDecodeError:
log.warning("Corrupted settings.json, using defaults")
except Exception as e:
@@ -93,6 +100,9 @@ class ServerStore:
"check_interval": self._check_interval,
"terminal_font_size": self._terminal_font_size,
"window_geometry": self._window_geometry,
"update_mode": self._update_mode,
"last_update_check": self._last_update_check,
"skip_version": self._skip_version,
}
try:
tmp = SETTINGS_FILE + ".tmp"
@@ -413,3 +423,28 @@ class ServerStore:
def set_terminal_font_size(self, size: int):
self._terminal_font_size = max(6, min(28, size))
self._save_settings()
# ── Update settings ──────────────────────────────────
def get_update_mode(self) -> str:
return self._update_mode
def set_update_mode(self, mode: str):
if mode in ("notify-only", "auto-download", "full-auto"):
self._update_mode = mode
self._save_settings()
def get_last_update_check(self) -> float:
return self._last_update_check
def set_last_update_check(self):
import time
self._last_update_check = time.time()
self._save_settings()
def get_skip_version(self) -> str:
return self._skip_version
def set_skip_version(self, version: str):
self._skip_version = version
self._save_settings()

396
core/updater.py Normal file
View File

@@ -0,0 +1,396 @@
"""
Auto-updater — checks Gitea releases, downloads and applies updates.
"""
import base64
import hashlib
import json
import os
import platform
import re
import subprocess
import sys
import tempfile
import threading
import time
import urllib.error
import urllib.request
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from core.server_store import ServerStore
from core.logger import log
from version import __version__
# Check interval: 1 hour
_CHECK_INTERVAL = 3600
# Hide console windows on Windows for subprocess calls
_SUBPROCESS_FLAGS = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def _platform_tag() -> str:
"""Get platform tag matching build.py naming: win-x64, linux-x64, etc."""
s = platform.system().lower()
m = platform.machine().lower()
arch_map = {
"x86_64": "x64", "amd64": "x64",
"x86": "x32", "i686": "x32", "i386": "x32",
"aarch64": "arm64", "arm64": "arm64",
"armv7l": "arm",
}
arch = arch_map.get(m, m)
os_map = {"windows": "win", "linux": "linux", "darwin": "mac"}
os_tag = os_map.get(s, s)
return f"{os_tag}-{arch}"
def _parse_version(ver: str) -> tuple[int, int, int]:
"""Parse semver string into (major, minor, patch)."""
m = re.match(r"v?(\d+)\.(\d+)\.(\d+)", ver)
if not m:
return (0, 0, 0)
return int(m.group(1)), int(m.group(2)), int(m.group(3))
def _version_newer(remote: str, local: str) -> bool:
"""Return True if remote version is newer than local."""
return _parse_version(remote) > _parse_version(local)
def _gitea_api(endpoint: str) -> Optional[dict]:
"""Call Gitea API. Reads credentials from git remote 'sensey'."""
try:
result = subprocess.run(
["git", "remote", "get-url", "sensey"],
capture_output=True, text=True, cwd=PROJECT_DIR,
creationflags=_SUBPROCESS_FLAGS,
)
remote_url = result.stdout.strip()
except Exception:
# Frozen exe: no git available, try reading from embedded config
remote_url = ""
if not remote_url:
# Fallback: try common remote names
for name in ["origin"]:
try:
result = subprocess.run(
["git", "remote", "get-url", name],
capture_output=True, text=True, cwd=PROJECT_DIR,
creationflags=_SUBPROCESS_FLAGS,
)
if result.returncode == 0:
remote_url = result.stdout.strip()
break
except Exception:
pass
if not remote_url:
return None
m = re.match(r"https://([^:]+):([^@]+)@([^/]+)/(.+?)(?:\.git)?$", remote_url)
if not m:
return None
user, password, host, repo_path = m.groups()
url = f"https://{host}/api/v1/repos/{repo_path}/{endpoint}"
headers = {
"Authorization": "Basic " + base64.b64encode(
f"{user}:{password}".encode()
).decode(),
}
req = urllib.request.Request(url, headers=headers, method="GET")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode())
except Exception:
return None
class UpdateChecker:
"""Background thread that checks for new releases on Gitea."""
def __init__(self, store: "ServerStore", gui_callback=None):
self.store = store
self._gui_callback = gui_callback
self._running = False
self._thread: Optional[threading.Thread] = None
self._latest: Optional[dict] = None # cached latest release info
self._download_path: Optional[str] = None
def start(self):
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=3.0)
@property
def latest(self) -> Optional[dict]:
"""Latest release info: {version, url, changelog, filename, sha256}."""
return self._latest
@property
def download_path(self) -> Optional[str]:
return self._download_path
def check_now(self) -> Optional[dict]:
"""Manual check — returns release info or None."""
info = self._fetch_latest_release()
if info:
self._latest = info
# Update timestamp
self.store.set_last_update_check()
return info
def _fetch_latest_release(self) -> Optional[dict]:
"""Fetch latest release from Gitea API."""
try:
data = _gitea_api("releases?limit=1")
except Exception:
return None
if not data or not isinstance(data, list) or len(data) == 0:
return None
release = data[0]
tag = release.get("tag_name", "")
version = tag.lstrip("v")
if not _version_newer(version, __version__):
return None
# Skip if user chose to skip this version
skip = self.store.get_skip_version()
if skip and version == skip:
return None
# Find platform-specific asset
assets = release.get("assets", [])
asset = self._get_platform_asset(assets)
if not asset:
return None
return {
"version": version,
"tag": tag,
"url": asset["browser_download_url"],
"filename": asset["name"],
"size": asset.get("size", 0),
"changelog": release.get("body", ""),
"sha256": self._extract_sha256(release.get("body", ""), asset["name"]),
}
def _get_platform_asset(self, assets: list[dict]) -> Optional[dict]:
"""Find the asset matching current platform."""
tag = _platform_tag()
for asset in assets:
name = asset.get("name", "")
if tag in name:
return asset
return None
def _extract_sha256(self, body: str, filename: str) -> Optional[str]:
"""Try to extract SHA256 hash from release body."""
# Common formats: `sha256: abc123...` or `abc123... filename`
for line in body.splitlines():
if filename in line:
m = re.search(r"[a-f0-9]{64}", line, re.IGNORECASE)
if m:
return m.group(0)
# Also check for generic sha256 line
m = re.search(r"sha256[:\s]+([a-f0-9]{64})", body, re.IGNORECASE)
if m:
return m.group(1)
return None
def download_update(self, url: str, progress_cb=None) -> Optional[str]:
"""Download update binary to temp dir. Returns path or None."""
try:
# Get auth headers
result = subprocess.run(
["git", "remote", "get-url", "sensey"],
capture_output=True, text=True, cwd=PROJECT_DIR,
creationflags=_SUBPROCESS_FLAGS,
)
remote_url = result.stdout.strip()
headers = {}
m = re.match(r"https://([^:]+):([^@]+)@", remote_url)
if m:
user, password = m.groups()
headers["Authorization"] = "Basic " + base64.b64encode(
f"{user}:{password}".encode()
).decode()
req = urllib.request.Request(url, headers=headers, method="GET")
resp = urllib.request.urlopen(req, timeout=120)
total = int(resp.headers.get("Content-Length", 0))
downloaded = 0
# Save to temp file
suffix = ".exe" if sys.platform == "win32" else ""
fd, tmp_path = tempfile.mkstemp(suffix=suffix, prefix="sm_update_")
with os.fdopen(fd, "wb") as f:
while True:
chunk = resp.read(65536)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if progress_cb and total > 0:
progress_cb(downloaded, total)
resp.close()
# Verify SHA256 if available
if self._latest and self._latest.get("sha256"):
expected = self._latest["sha256"].lower()
sha = hashlib.sha256()
with open(tmp_path, "rb") as f:
while True:
chunk = f.read(65536)
if not chunk:
break
sha.update(chunk)
actual = sha.hexdigest()
if actual != expected:
log.error(f"SHA256 mismatch: expected {expected}, got {actual}")
os.remove(tmp_path)
return None
self._download_path = tmp_path
return tmp_path
except Exception as e:
log.error(f"Download failed: {e}")
return None
def apply_update(self, binary_path: str):
"""Replace current exe and restart."""
if not getattr(sys, "frozen", False):
log.info("Not a frozen exe, skipping apply")
return False
current_exe = sys.executable
if sys.platform == "win32":
return self._apply_windows(binary_path, current_exe)
else:
return self._apply_linux(binary_path, current_exe)
def _apply_windows(self, new_exe: str, current_exe: str) -> bool:
"""Windows update: create hidden .vbs that runs .bat silently."""
try:
tmp_dir = tempfile.gettempdir()
bat_path = os.path.join(tmp_dir, "sm_update.bat")
vbs_path = os.path.join(tmp_dir, "sm_update.vbs")
pid = os.getpid()
bat_content = f"""@echo off
:wait
tasklist /fi "PID eq {pid}" 2>NUL | find "{pid}" >NUL
if %ERRORLEVEL% == 0 (
timeout /t 1 /nobreak >NUL
goto wait
)
copy /Y "{new_exe}" "{current_exe}" >NUL
if %ERRORLEVEL% NEQ 0 (
exit /b 1
)
start "" "{current_exe}"
del "{bat_path}"
del "%~f0"
"""
with open(bat_path, "w") as f:
f.write(bat_content)
# VBS wrapper runs .bat completely hidden (no CMD flash)
vbs_content = f'CreateObject("Wscript.Shell").Run """{bat_path}""", 0, False\n'
with open(vbs_path, "w") as f:
f.write(vbs_content)
# Launch VBS detached — no visible window at all
subprocess.Popen(
["wscript", vbs_path],
creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP,
)
return True
except Exception as e:
log.error(f"Windows update failed: {e}")
return False
def _apply_linux(self, new_exe: str, current_exe: str) -> bool:
"""Linux update: replace in-place and exec."""
try:
os.replace(new_exe, current_exe)
os.chmod(current_exe, 0o755)
os.execv(current_exe, sys.argv)
return True # won't reach here
except Exception as e:
log.error(f"Linux update failed: {e}")
return False
def _loop(self):
"""Background loop — check periodically."""
# Initial delay: 10 seconds after startup
for _ in range(100):
if not self._running:
return
time.sleep(0.1)
while self._running:
# Check if enough time passed since last check
last_check = self.store.get_last_update_check()
now = time.time()
if not last_check or (now - last_check) >= _CHECK_INTERVAL:
info = self.check_now()
if info and self._gui_callback:
mode = self.store.get_update_mode()
if mode == "full-auto":
# Auto-download and apply
path = self.download_update(info["url"])
if path:
try:
self._gui_callback("auto_apply", info, path)
except Exception:
pass
elif mode == "auto-download":
# Download in background, then notify
path = self.download_update(info["url"])
if path:
try:
self._gui_callback("downloaded", info, path)
except Exception:
pass
else:
try:
self._gui_callback("available", info, None)
except Exception:
pass
else:
# notify-only
try:
self._gui_callback("available", info, None)
except Exception:
pass
# Sleep for 5 minutes, checking _running flag
for _ in range(3000):
if not self._running:
return
time.sleep(0.1)