Files
server-manager/core/updater.py
2026-03-01 09:36:01 -05:00

365 lines
12 KiB
Python

"""
Auto-updater — checks Gitea releases, downloads and applies updates.
"""
import base64
import hashlib
import json
import os
import platform
import re
import subprocess
import sys
import tempfile
import threading
import time
import urllib.error
import urllib.request
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from core.server_store import ServerStore
from core.logger import log
from version import __version__
# Check interval: 1 hour
_CHECK_INTERVAL = 3600
# Hide console windows on Windows for subprocess calls
_SUBPROCESS_FLAGS = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def _platform_tag() -> str:
"""Get platform tag matching build.py naming: win-x64, linux-x64, etc."""
s = platform.system().lower()
m = platform.machine().lower()
arch_map = {
"x86_64": "x64", "amd64": "x64",
"x86": "x32", "i686": "x32", "i386": "x32",
"aarch64": "arm64", "arm64": "arm64",
"armv7l": "arm",
}
arch = arch_map.get(m, m)
os_map = {"windows": "win", "linux": "linux", "darwin": "mac"}
os_tag = os_map.get(s, s)
return f"{os_tag}-{arch}"
def _parse_version(ver: str) -> tuple[int, int, int]:
"""Parse semver string into (major, minor, patch)."""
m = re.match(r"v?(\d+)\.(\d+)\.(\d+)", ver)
if not m:
return (0, 0, 0)
return int(m.group(1)), int(m.group(2)), int(m.group(3))
def _version_newer(remote: str, local: str) -> bool:
"""Return True if remote version is newer than local."""
return _parse_version(remote) > _parse_version(local)
_GITEA_API_BASE = "https://git.sensey24.ru/api/v1/repos/aibot777/server-manager"
def _get_auth_headers() -> dict:
"""Try to get Gitea auth headers from git remote. Returns empty dict on failure."""
try:
result = subprocess.run(
["git", "remote", "get-url", "sensey"],
capture_output=True, text=True, cwd=PROJECT_DIR,
creationflags=_SUBPROCESS_FLAGS,
)
remote_url = result.stdout.strip()
m = re.match(r"https://([^:]+):([^@]+)@", remote_url)
if m:
user, password = m.groups()
return {
"Authorization": "Basic " + base64.b64encode(
f"{user}:{password}".encode()
).decode()
}
except Exception:
pass
return {}
def _gitea_api(endpoint: str) -> Optional[dict]:
"""Call Gitea API. Tries git remote auth first, falls back to public API."""
url = f"{_GITEA_API_BASE}/{endpoint}"
req = urllib.request.Request(url, headers=_get_auth_headers(), method="GET")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode())
except Exception:
return None
class UpdateChecker:
"""Background thread that checks for new releases on Gitea."""
def __init__(self, store: "ServerStore", gui_callback=None):
self.store = store
self._gui_callback = gui_callback
self._running = False
self._thread: Optional[threading.Thread] = None
self._latest: Optional[dict] = None # cached latest release info
self._download_path: Optional[str] = None
def start(self):
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=3.0)
@property
def latest(self) -> Optional[dict]:
"""Latest release info: {version, url, changelog, filename, sha256}."""
return self._latest
@property
def download_path(self) -> Optional[str]:
return self._download_path
def check_now(self) -> Optional[dict]:
"""Manual check — returns release info or None."""
info = self._fetch_latest_release()
if info:
self._latest = info
# Update timestamp
self.store.set_last_update_check()
return info
def _fetch_latest_release(self) -> Optional[dict]:
"""Fetch latest release from Gitea API."""
try:
data = _gitea_api("releases?limit=1")
except Exception:
return None
if not data or not isinstance(data, list) or len(data) == 0:
return None
release = data[0]
tag = release.get("tag_name", "")
version = tag.lstrip("v")
if not _version_newer(version, __version__):
return None
# Skip if user chose to skip this version
skip = self.store.get_skip_version()
if skip and version == skip:
return None
# Find platform-specific asset
assets = release.get("assets", [])
asset = self._get_platform_asset(assets)
if not asset:
return None
return {
"version": version,
"tag": tag,
"url": asset["browser_download_url"],
"filename": asset["name"],
"size": asset.get("size", 0),
"changelog": release.get("body", ""),
"sha256": self._extract_sha256(release.get("body", ""), asset["name"]),
}
def _get_platform_asset(self, assets: list[dict]) -> Optional[dict]:
"""Find the asset matching current platform."""
tag = _platform_tag()
for asset in assets:
name = asset.get("name", "")
if tag in name:
return asset
return None
def _extract_sha256(self, body: str, filename: str) -> Optional[str]:
"""Try to extract SHA256 hash from release body."""
# Common formats: `sha256: abc123...` or `abc123... filename`
for line in body.splitlines():
if filename in line:
m = re.search(r"[a-f0-9]{64}", line, re.IGNORECASE)
if m:
return m.group(0)
# Also check for generic sha256 line
m = re.search(r"sha256[:\s]+([a-f0-9]{64})", body, re.IGNORECASE)
if m:
return m.group(1)
return None
def download_update(self, url: str, progress_cb=None) -> Optional[str]:
"""Download update binary to temp dir. Returns path or None."""
try:
headers = _get_auth_headers()
req = urllib.request.Request(url, headers=headers, method="GET")
resp = urllib.request.urlopen(req, timeout=120)
total = int(resp.headers.get("Content-Length", 0))
downloaded = 0
# Save to temp file
suffix = ".exe" if sys.platform == "win32" else ""
fd, tmp_path = tempfile.mkstemp(suffix=suffix, prefix="sm_update_")
with os.fdopen(fd, "wb") as f:
while True:
chunk = resp.read(65536)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if progress_cb and total > 0:
progress_cb(downloaded, total)
resp.close()
# Verify SHA256 if available
if self._latest and self._latest.get("sha256"):
expected = self._latest["sha256"].lower()
sha = hashlib.sha256()
with open(tmp_path, "rb") as f:
while True:
chunk = f.read(65536)
if not chunk:
break
sha.update(chunk)
actual = sha.hexdigest()
if actual != expected:
log.error(f"SHA256 mismatch: expected {expected}, got {actual}")
os.remove(tmp_path)
return None
self._download_path = tmp_path
return tmp_path
except Exception as e:
log.error(f"Download failed: {e}")
return None
def apply_update(self, binary_path: str):
"""Replace current exe and restart."""
if not getattr(sys, "frozen", False):
log.info("Not a frozen exe, skipping apply")
return False
current_exe = sys.executable
if sys.platform == "win32":
return self._apply_windows(binary_path, current_exe)
else:
return self._apply_linux(binary_path, current_exe)
def _apply_windows(self, new_exe: str, current_exe: str) -> bool:
"""Windows update: create hidden .vbs that runs .bat silently."""
try:
tmp_dir = tempfile.gettempdir()
bat_path = os.path.join(tmp_dir, "sm_update.bat")
vbs_path = os.path.join(tmp_dir, "sm_update.vbs")
pid = os.getpid()
bat_content = f"""@echo off
:wait
tasklist /fi "PID eq {pid}" 2>NUL | find "{pid}" >NUL
if %ERRORLEVEL% == 0 (
timeout /t 1 /nobreak >NUL
goto wait
)
copy /Y "{new_exe}" "{current_exe}" >NUL
if %ERRORLEVEL% NEQ 0 (
exit /b 1
)
start "" "{current_exe}"
del "{bat_path}"
del "%~f0"
"""
with open(bat_path, "w") as f:
f.write(bat_content)
# VBS wrapper runs .bat completely hidden (no CMD flash)
vbs_content = f'CreateObject("Wscript.Shell").Run """{bat_path}""", 0, False\n'
with open(vbs_path, "w") as f:
f.write(vbs_content)
# Launch VBS via os.startfile — most reliable on Windows,
# works from frozen exe without PATH issues
os.startfile(vbs_path)
return True
except Exception as e:
log.error(f"Windows update failed: {e}")
return False
def _apply_linux(self, new_exe: str, current_exe: str) -> bool:
"""Linux update: replace in-place and exec."""
try:
os.replace(new_exe, current_exe)
os.chmod(current_exe, 0o755)
os.execv(current_exe, sys.argv)
return True # won't reach here
except Exception as e:
log.error(f"Linux update failed: {e}")
return False
def _loop(self):
"""Background loop — check periodically."""
# Initial delay: 10 seconds after startup
for _ in range(100):
if not self._running:
return
time.sleep(0.1)
while self._running:
# Check if enough time passed since last check
last_check = self.store.get_last_update_check()
now = time.time()
if not last_check or (now - last_check) >= _CHECK_INTERVAL:
info = self.check_now()
if info and self._gui_callback:
mode = self.store.get_update_mode()
if mode == "full-auto":
# Auto-download and apply
path = self.download_update(info["url"])
if path:
try:
self._gui_callback("auto_apply", info, path)
except Exception:
pass
elif mode == "auto-download":
# Download in background, then notify
path = self.download_update(info["url"])
if path:
try:
self._gui_callback("downloaded", info, path)
except Exception:
pass
else:
try:
self._gui_callback("available", info, None)
except Exception:
pass
else:
# notify-only
try:
self._gui_callback("available", info, None)
except Exception:
pass
# Sleep for 5 minutes, checking _running flag
for _ in range(3000):
if not self._running:
return
time.sleep(0.1)