449 lines
15 KiB
Python
449 lines
15 KiB
Python
"""
|
|
Auto-updater — checks Gitea releases, downloads and applies updates.
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import platform
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from typing import TYPE_CHECKING, Optional
|
|
|
|
if TYPE_CHECKING:
|
|
from core.server_store import ServerStore
|
|
|
|
from core.logger import log
|
|
from version import __version__
|
|
|
|
|
|
# Check interval: 1 hour
|
|
_CHECK_INTERVAL = 3600
|
|
|
|
# Hide console windows on Windows for subprocess calls
|
|
_SUBPROCESS_FLAGS = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
|
|
|
|
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
|
|
def _platform_tag() -> str:
|
|
"""Get platform tag matching build.py naming: win-x64, linux-x64, etc."""
|
|
s = platform.system().lower()
|
|
m = platform.machine().lower()
|
|
arch_map = {
|
|
"x86_64": "x64", "amd64": "x64",
|
|
"x86": "x32", "i686": "x32", "i386": "x32",
|
|
"aarch64": "arm64", "arm64": "arm64",
|
|
"armv7l": "arm",
|
|
}
|
|
arch = arch_map.get(m, m)
|
|
os_map = {"windows": "win", "linux": "linux", "darwin": "mac"}
|
|
os_tag = os_map.get(s, s)
|
|
return f"{os_tag}-{arch}"
|
|
|
|
|
|
def _parse_version(ver: str) -> tuple[int, int, int]:
|
|
"""Parse semver string into (major, minor, patch)."""
|
|
m = re.match(r"v?(\d+)\.(\d+)\.(\d+)", ver)
|
|
if not m:
|
|
return (0, 0, 0)
|
|
return int(m.group(1)), int(m.group(2)), int(m.group(3))
|
|
|
|
|
|
def _version_newer(remote: str, local: str) -> bool:
|
|
"""Return True if remote version is newer than local."""
|
|
return _parse_version(remote) > _parse_version(local)
|
|
|
|
|
|
_GITEA_API_BASE = "https://git.sensey24.ru/api/v1/repos/aibot777/server-manager"
|
|
|
|
|
|
def _get_auth_headers() -> dict:
|
|
"""Try to get Gitea auth headers from git remote. Returns empty dict on failure."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "remote", "get-url", "sensey"],
|
|
capture_output=True, text=True, cwd=PROJECT_DIR,
|
|
creationflags=_SUBPROCESS_FLAGS,
|
|
)
|
|
remote_url = result.stdout.strip()
|
|
m = re.match(r"https://([^:]+):([^@]+)@", remote_url)
|
|
if m:
|
|
user, password = m.groups()
|
|
return {
|
|
"Authorization": "Basic " + base64.b64encode(
|
|
f"{user}:{password}".encode()
|
|
).decode()
|
|
}
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _gitea_api(endpoint: str) -> Optional[dict]:
|
|
"""Call Gitea API. Tries git remote auth first, falls back to public API."""
|
|
url = f"{_GITEA_API_BASE}/{endpoint}"
|
|
req = urllib.request.Request(url, headers=_get_auth_headers(), method="GET")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return json.loads(resp.read().decode())
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
class UpdateChecker:
|
|
"""Background thread that checks for new releases on Gitea."""
|
|
|
|
def __init__(self, store: "ServerStore", gui_callback=None):
|
|
self.store = store
|
|
self._gui_callback = gui_callback
|
|
self._running = False
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._latest: Optional[dict] = None # cached latest release info
|
|
self._download_path: Optional[str] = None
|
|
|
|
def start(self):
|
|
if self._running:
|
|
return
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._loop, daemon=True)
|
|
self._thread.start()
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
if self._thread:
|
|
self._thread.join(timeout=3.0)
|
|
|
|
@property
|
|
def latest(self) -> Optional[dict]:
|
|
"""Latest release info: {version, url, changelog, filename, sha256}."""
|
|
return self._latest
|
|
|
|
@property
|
|
def download_path(self) -> Optional[str]:
|
|
return self._download_path
|
|
|
|
def check_now(self) -> Optional[dict]:
|
|
"""Manual check — returns release info or None."""
|
|
info = self._fetch_latest_release()
|
|
if info:
|
|
self._latest = info
|
|
# Update timestamp
|
|
self.store.set_last_update_check()
|
|
return info
|
|
|
|
def _fetch_latest_release(self) -> Optional[dict]:
|
|
"""Fetch latest release from Gitea API."""
|
|
try:
|
|
data = _gitea_api("releases?limit=1")
|
|
except Exception:
|
|
return None
|
|
|
|
if not data or not isinstance(data, list) or len(data) == 0:
|
|
return None
|
|
|
|
release = data[0]
|
|
tag = release.get("tag_name", "")
|
|
version = tag.lstrip("v")
|
|
|
|
if not _version_newer(version, __version__):
|
|
return None
|
|
|
|
# Skip if user chose to skip this version
|
|
skip = self.store.get_skip_version()
|
|
if skip and version == skip:
|
|
return None
|
|
|
|
# Find platform-specific asset
|
|
assets = release.get("assets", [])
|
|
asset = self._get_platform_asset(assets)
|
|
if not asset:
|
|
return None
|
|
|
|
return {
|
|
"version": version,
|
|
"tag": tag,
|
|
"url": asset["browser_download_url"],
|
|
"filename": asset["name"],
|
|
"size": asset.get("size", 0),
|
|
"changelog": release.get("body", ""),
|
|
"sha256": self._extract_sha256(release.get("body", ""), asset["name"]),
|
|
}
|
|
|
|
def _get_platform_asset(self, assets: list[dict]) -> Optional[dict]:
|
|
"""Find the asset matching current platform."""
|
|
tag = _platform_tag()
|
|
for asset in assets:
|
|
name = asset.get("name", "")
|
|
if tag in name:
|
|
return asset
|
|
return None
|
|
|
|
def _extract_sha256(self, body: str, filename: str) -> Optional[str]:
|
|
"""Try to extract SHA256 hash from release body."""
|
|
# Common formats: `sha256: abc123...` or `abc123... filename`
|
|
for line in body.splitlines():
|
|
if filename in line:
|
|
m = re.search(r"[a-f0-9]{64}", line, re.IGNORECASE)
|
|
if m:
|
|
return m.group(0)
|
|
# Also check for generic sha256 line
|
|
m = re.search(r"sha256[:\s]+([a-f0-9]{64})", body, re.IGNORECASE)
|
|
if m:
|
|
return m.group(1)
|
|
return None
|
|
|
|
def download_update(self, url: str, progress_cb=None) -> Optional[str]:
|
|
"""Download update binary to temp dir. Returns path or None."""
|
|
try:
|
|
headers = _get_auth_headers()
|
|
req = urllib.request.Request(url, headers=headers, method="GET")
|
|
resp = urllib.request.urlopen(req, timeout=120)
|
|
|
|
total = int(resp.headers.get("Content-Length", 0))
|
|
downloaded = 0
|
|
|
|
# Save to temp file
|
|
suffix = ".exe" if sys.platform == "win32" else ""
|
|
fd, tmp_path = tempfile.mkstemp(suffix=suffix, prefix="sm_update_")
|
|
with os.fdopen(fd, "wb") as f:
|
|
while True:
|
|
chunk = resp.read(65536)
|
|
if not chunk:
|
|
break
|
|
f.write(chunk)
|
|
downloaded += len(chunk)
|
|
if progress_cb and total > 0:
|
|
progress_cb(downloaded, total)
|
|
|
|
resp.close()
|
|
|
|
# Verify SHA256 if available
|
|
if self._latest and self._latest.get("sha256"):
|
|
expected = self._latest["sha256"].lower()
|
|
sha = hashlib.sha256()
|
|
with open(tmp_path, "rb") as f:
|
|
while True:
|
|
chunk = f.read(65536)
|
|
if not chunk:
|
|
break
|
|
sha.update(chunk)
|
|
actual = sha.hexdigest()
|
|
if actual != expected:
|
|
log.error(f"SHA256 mismatch: expected {expected}, got {actual}")
|
|
os.remove(tmp_path)
|
|
return None
|
|
|
|
self._download_path = tmp_path
|
|
return tmp_path
|
|
|
|
except Exception as e:
|
|
log.error(f"Download failed: {e}")
|
|
return None
|
|
|
|
def apply_update(self, binary_path: str):
|
|
"""Replace current exe and restart."""
|
|
if not getattr(sys, "frozen", False):
|
|
log.info("Not a frozen exe, skipping apply")
|
|
return False
|
|
|
|
current_exe = sys.executable
|
|
|
|
if sys.platform == "win32":
|
|
return self._apply_windows(binary_path, current_exe)
|
|
else:
|
|
return self._apply_linux(binary_path, current_exe)
|
|
|
|
def _apply_windows(self, new_exe: str, current_exe: str) -> bool:
|
|
"""Windows update: BAT script waits for exit, copies, launches."""
|
|
try:
|
|
tmp_dir = tempfile.gettempdir()
|
|
bat_path = os.path.join(tmp_dir, "sm_update.bat")
|
|
log_path = os.path.join(tmp_dir, "sm_update.log")
|
|
pid = os.getpid()
|
|
new_size = os.path.getsize(new_exe)
|
|
|
|
bat_content = f"""@echo off
|
|
setlocal enabledelayedexpansion
|
|
chcp 65001 >nul 2>&1
|
|
set "LOGFILE={log_path}"
|
|
set "SRC={new_exe}"
|
|
set "DST={current_exe}"
|
|
set "PID={pid}"
|
|
set "TMPDIR={tmp_dir}"
|
|
set "EXPECTED_SIZE={new_size}"
|
|
|
|
echo [%date% %time%] Update script started >> "%LOGFILE%"
|
|
echo [%date% %time%] PID to wait for: %PID% >> "%LOGFILE%"
|
|
echo [%date% %time%] SRC: %SRC% >> "%LOGFILE%"
|
|
echo [%date% %time%] DST: %DST% >> "%LOGFILE%"
|
|
echo [%date% %time%] Expected size: %EXPECTED_SIZE% >> "%LOGFILE%"
|
|
|
|
:wait_loop
|
|
tasklist /FI "PID eq %PID%" 2>nul | find "%PID%" >nul
|
|
if %errorlevel%==0 (
|
|
echo [%date% %time%] Waiting for PID %PID% to exit... >> "%LOGFILE%"
|
|
timeout /t 1 /nobreak >nul
|
|
goto wait_loop
|
|
)
|
|
|
|
echo [%date% %time%] Process exited, waiting 3s... >> "%LOGFILE%"
|
|
timeout /t 3 /nobreak >nul
|
|
|
|
rem Clean stale _MEI directories
|
|
echo [%date% %time%] Cleaning _MEI directories... >> "%LOGFILE%"
|
|
for /d %%D in ("%TMPDIR%\\_MEI*") do (
|
|
rmdir /s /q "%%D" >nul 2>&1
|
|
)
|
|
timeout /t 1 /nobreak >nul
|
|
|
|
rem Log source file size
|
|
for %%F in ("%SRC%") do (
|
|
echo [%date% %time%] SRC file size: %%~zF >> "%LOGFILE%"
|
|
)
|
|
|
|
rem Delete old DST first so copy is clean
|
|
echo [%date% %time%] Deleting old DST... >> "%LOGFILE%"
|
|
del /f /q "%DST%" >nul 2>&1
|
|
if exist "%DST%" (
|
|
echo [%date% %time%] WARNING: could not delete old DST >> "%LOGFILE%"
|
|
)
|
|
timeout /t 1 /nobreak >nul
|
|
|
|
rem Copy with retry and size verification
|
|
echo [%date% %time%] Starting copy... >> "%LOGFILE%"
|
|
set COPIED=0
|
|
for /L %%i in (1,1,5) do (
|
|
if !COPIED!==0 (
|
|
echo [%date% %time%] Copy attempt %%i... >> "%LOGFILE%"
|
|
copy /Y /B "%SRC%" "%DST%" >> "%LOGFILE%" 2>&1
|
|
if exist "%DST%" (
|
|
for %%F in ("%DST%") do set DST_SIZE=%%~zF
|
|
echo [%date% %time%] DST size after copy: !DST_SIZE! >> "%LOGFILE%"
|
|
if "!DST_SIZE!"=="%EXPECTED_SIZE%" (
|
|
echo [%date% %time%] Size verified OK >> "%LOGFILE%"
|
|
set COPIED=1
|
|
) else (
|
|
echo [%date% %time%] Size mismatch! Expected %EXPECTED_SIZE%, got !DST_SIZE! >> "%LOGFILE%"
|
|
del /f /q "%DST%" >nul 2>&1
|
|
timeout /t 2 /nobreak >nul
|
|
)
|
|
) else (
|
|
echo [%date% %time%] Copy failed - DST does not exist >> "%LOGFILE%"
|
|
timeout /t 2 /nobreak >nul
|
|
)
|
|
)
|
|
)
|
|
|
|
if %COPIED%==0 (
|
|
echo [%date% %time%] FAILED: could not copy after 5 attempts >> "%LOGFILE%"
|
|
pause
|
|
goto cleanup
|
|
)
|
|
|
|
echo [%date% %time%] Copy successful, launching... >> "%LOGFILE%"
|
|
timeout /t 2 /nobreak >nul
|
|
|
|
rem Launch new exe
|
|
echo [%date% %time%] Starting: %DST% >> "%LOGFILE%"
|
|
start "" "%DST%"
|
|
|
|
echo [%date% %time%] Launch command issued >> "%LOGFILE%"
|
|
timeout /t 2 /nobreak >nul
|
|
|
|
:cleanup
|
|
rem Delete downloaded update file
|
|
del /f /q "%SRC%" >nul 2>&1
|
|
echo [%date% %time%] Cleanup done >> "%LOGFILE%"
|
|
|
|
rem Self-delete
|
|
del /f /q "%~f0" >nul 2>&1
|
|
"""
|
|
with open(bat_path, "w", encoding="utf-8") as f:
|
|
f.write(bat_content)
|
|
|
|
log.info(f"Update BAT: {bat_path}, log: {log_path}")
|
|
|
|
# Launch BAT minimized
|
|
subprocess.Popen(
|
|
["cmd.exe", "/c", "start", "/min", "", bat_path],
|
|
creationflags=_SUBPROCESS_FLAGS,
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
log.error(f"Windows update failed: {e}")
|
|
return False
|
|
|
|
def _apply_linux(self, new_exe: str, current_exe: str) -> bool:
|
|
"""Linux update: replace in-place and exec."""
|
|
try:
|
|
os.replace(new_exe, current_exe)
|
|
os.chmod(current_exe, 0o755)
|
|
os.execv(current_exe, sys.argv)
|
|
return True # won't reach here
|
|
except Exception as e:
|
|
log.error(f"Linux update failed: {e}")
|
|
return False
|
|
|
|
def _loop(self):
|
|
"""Background loop — check periodically."""
|
|
# Initial delay: 10 seconds after startup
|
|
for _ in range(100):
|
|
if not self._running:
|
|
return
|
|
time.sleep(0.1)
|
|
|
|
first_run = True
|
|
while self._running:
|
|
# Check if enough time passed since last check
|
|
# On first run after startup, always check regardless of interval
|
|
last_check = self.store.get_last_update_check()
|
|
now = time.time()
|
|
|
|
if first_run or not last_check or (now - last_check) >= _CHECK_INTERVAL:
|
|
first_run = False
|
|
info = self.check_now()
|
|
if info and self._gui_callback:
|
|
mode = self.store.get_update_mode()
|
|
if mode == "full-auto":
|
|
# Auto-download and apply
|
|
path = self.download_update(info["url"])
|
|
if path:
|
|
try:
|
|
self._gui_callback("auto_apply", info, path)
|
|
except Exception:
|
|
pass
|
|
elif mode == "auto-download":
|
|
# Download in background, then notify
|
|
path = self.download_update(info["url"])
|
|
if path:
|
|
try:
|
|
self._gui_callback("downloaded", info, path)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
self._gui_callback("available", info, None)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# notify-only
|
|
try:
|
|
self._gui_callback("available", info, None)
|
|
except Exception:
|
|
pass
|
|
|
|
# Sleep for 5 minutes, checking _running flag
|
|
for _ in range(3000):
|
|
if not self._running:
|
|
return
|
|
time.sleep(0.1)
|