Files
server-manager/core/updater.py
chrome-storm-c442 f2dc978c57 v1.8.99: fix update script — delete-before-copy, size verification, error logging
Previous BAT script falsely reported success because `if exist` checked
the pre-existing file, not the copy result. Now: deletes old DST first,
copies with /B (binary), verifies size matches expected, logs all errors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 03:12:54 -05:00

446 lines
14 KiB
Python

"""
Auto-updater — checks Gitea releases, downloads and applies updates.
"""
import base64
import hashlib
import json
import os
import platform
import re
import subprocess
import sys
import tempfile
import threading
import time
import urllib.error
import urllib.request
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from core.server_store import ServerStore
from core.logger import log
from version import __version__
# Check interval: 1 hour
_CHECK_INTERVAL = 3600
# Hide console windows on Windows for subprocess calls
_SUBPROCESS_FLAGS = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def _platform_tag() -> str:
"""Get platform tag matching build.py naming: win-x64, linux-x64, etc."""
s = platform.system().lower()
m = platform.machine().lower()
arch_map = {
"x86_64": "x64", "amd64": "x64",
"x86": "x32", "i686": "x32", "i386": "x32",
"aarch64": "arm64", "arm64": "arm64",
"armv7l": "arm",
}
arch = arch_map.get(m, m)
os_map = {"windows": "win", "linux": "linux", "darwin": "mac"}
os_tag = os_map.get(s, s)
return f"{os_tag}-{arch}"
def _parse_version(ver: str) -> tuple[int, int, int]:
"""Parse semver string into (major, minor, patch)."""
m = re.match(r"v?(\d+)\.(\d+)\.(\d+)", ver)
if not m:
return (0, 0, 0)
return int(m.group(1)), int(m.group(2)), int(m.group(3))
def _version_newer(remote: str, local: str) -> bool:
"""Return True if remote version is newer than local."""
return _parse_version(remote) > _parse_version(local)
_GITEA_API_BASE = "https://git.sensey24.ru/api/v1/repos/aibot777/server-manager"
def _get_auth_headers() -> dict:
"""Try to get Gitea auth headers from git remote. Returns empty dict on failure."""
try:
result = subprocess.run(
["git", "remote", "get-url", "sensey"],
capture_output=True, text=True, cwd=PROJECT_DIR,
creationflags=_SUBPROCESS_FLAGS,
)
remote_url = result.stdout.strip()
m = re.match(r"https://([^:]+):([^@]+)@", remote_url)
if m:
user, password = m.groups()
return {
"Authorization": "Basic " + base64.b64encode(
f"{user}:{password}".encode()
).decode()
}
except Exception:
pass
return {}
def _gitea_api(endpoint: str) -> Optional[dict]:
"""Call Gitea API. Tries git remote auth first, falls back to public API."""
url = f"{_GITEA_API_BASE}/{endpoint}"
req = urllib.request.Request(url, headers=_get_auth_headers(), method="GET")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode())
except Exception:
return None
class UpdateChecker:
"""Background thread that checks for new releases on Gitea."""
def __init__(self, store: "ServerStore", gui_callback=None):
self.store = store
self._gui_callback = gui_callback
self._running = False
self._thread: Optional[threading.Thread] = None
self._latest: Optional[dict] = None # cached latest release info
self._download_path: Optional[str] = None
def start(self):
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=3.0)
@property
def latest(self) -> Optional[dict]:
"""Latest release info: {version, url, changelog, filename, sha256}."""
return self._latest
@property
def download_path(self) -> Optional[str]:
return self._download_path
def check_now(self) -> Optional[dict]:
"""Manual check — returns release info or None."""
info = self._fetch_latest_release()
if info:
self._latest = info
# Update timestamp
self.store.set_last_update_check()
return info
def _fetch_latest_release(self) -> Optional[dict]:
"""Fetch latest release from Gitea API."""
try:
data = _gitea_api("releases?limit=1")
except Exception:
return None
if not data or not isinstance(data, list) or len(data) == 0:
return None
release = data[0]
tag = release.get("tag_name", "")
version = tag.lstrip("v")
if not _version_newer(version, __version__):
return None
# Skip if user chose to skip this version
skip = self.store.get_skip_version()
if skip and version == skip:
return None
# Find platform-specific asset
assets = release.get("assets", [])
asset = self._get_platform_asset(assets)
if not asset:
return None
return {
"version": version,
"tag": tag,
"url": asset["browser_download_url"],
"filename": asset["name"],
"size": asset.get("size", 0),
"changelog": release.get("body", ""),
"sha256": self._extract_sha256(release.get("body", ""), asset["name"]),
}
def _get_platform_asset(self, assets: list[dict]) -> Optional[dict]:
"""Find the asset matching current platform."""
tag = _platform_tag()
for asset in assets:
name = asset.get("name", "")
if tag in name:
return asset
return None
def _extract_sha256(self, body: str, filename: str) -> Optional[str]:
"""Try to extract SHA256 hash from release body."""
# Common formats: `sha256: abc123...` or `abc123... filename`
for line in body.splitlines():
if filename in line:
m = re.search(r"[a-f0-9]{64}", line, re.IGNORECASE)
if m:
return m.group(0)
# Also check for generic sha256 line
m = re.search(r"sha256[:\s]+([a-f0-9]{64})", body, re.IGNORECASE)
if m:
return m.group(1)
return None
def download_update(self, url: str, progress_cb=None) -> Optional[str]:
"""Download update binary to temp dir. Returns path or None."""
try:
headers = _get_auth_headers()
req = urllib.request.Request(url, headers=headers, method="GET")
resp = urllib.request.urlopen(req, timeout=120)
total = int(resp.headers.get("Content-Length", 0))
downloaded = 0
# Save to temp file
suffix = ".exe" if sys.platform == "win32" else ""
fd, tmp_path = tempfile.mkstemp(suffix=suffix, prefix="sm_update_")
with os.fdopen(fd, "wb") as f:
while True:
chunk = resp.read(65536)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if progress_cb and total > 0:
progress_cb(downloaded, total)
resp.close()
# Verify SHA256 if available
if self._latest and self._latest.get("sha256"):
expected = self._latest["sha256"].lower()
sha = hashlib.sha256()
with open(tmp_path, "rb") as f:
while True:
chunk = f.read(65536)
if not chunk:
break
sha.update(chunk)
actual = sha.hexdigest()
if actual != expected:
log.error(f"SHA256 mismatch: expected {expected}, got {actual}")
os.remove(tmp_path)
return None
self._download_path = tmp_path
return tmp_path
except Exception as e:
log.error(f"Download failed: {e}")
return None
def apply_update(self, binary_path: str):
"""Replace current exe and restart."""
if not getattr(sys, "frozen", False):
log.info("Not a frozen exe, skipping apply")
return False
current_exe = sys.executable
if sys.platform == "win32":
return self._apply_windows(binary_path, current_exe)
else:
return self._apply_linux(binary_path, current_exe)
def _apply_windows(self, new_exe: str, current_exe: str) -> bool:
"""Windows update: BAT script waits for exit, copies, launches."""
try:
tmp_dir = tempfile.gettempdir()
bat_path = os.path.join(tmp_dir, "sm_update.bat")
log_path = os.path.join(tmp_dir, "sm_update.log")
pid = os.getpid()
new_size = os.path.getsize(new_exe)
bat_content = f"""@echo off
setlocal enabledelayedexpansion
chcp 65001 >nul 2>&1
set "LOGFILE={log_path}"
set "SRC={new_exe}"
set "DST={current_exe}"
set "PID={pid}"
set "TMPDIR={tmp_dir}"
set "EXPECTED_SIZE={new_size}"
echo [%date% %time%] Update script started >> "%LOGFILE%"
echo [%date% %time%] PID to wait for: %PID% >> "%LOGFILE%"
echo [%date% %time%] SRC: %SRC% >> "%LOGFILE%"
echo [%date% %time%] DST: %DST% >> "%LOGFILE%"
echo [%date% %time%] Expected size: %EXPECTED_SIZE% >> "%LOGFILE%"
:wait_loop
tasklist /FI "PID eq %PID%" 2>nul | find "%PID%" >nul
if %errorlevel%==0 (
echo [%date% %time%] Waiting for PID %PID% to exit... >> "%LOGFILE%"
timeout /t 1 /nobreak >nul
goto wait_loop
)
echo [%date% %time%] Process exited, waiting 3s... >> "%LOGFILE%"
timeout /t 3 /nobreak >nul
rem Clean stale _MEI directories
echo [%date% %time%] Cleaning _MEI directories... >> "%LOGFILE%"
for /d %%D in ("%TMPDIR%\\_MEI*") do (
rmdir /s /q "%%D" >nul 2>&1
)
timeout /t 1 /nobreak >nul
rem Log source file size
for %%F in ("%SRC%") do (
echo [%date% %time%] SRC file size: %%~zF >> "%LOGFILE%"
)
rem Delete old DST first so copy is clean
echo [%date% %time%] Deleting old DST... >> "%LOGFILE%"
del /f /q "%DST%" >nul 2>&1
if exist "%DST%" (
echo [%date% %time%] WARNING: could not delete old DST >> "%LOGFILE%"
)
timeout /t 1 /nobreak >nul
rem Copy with retry and size verification
echo [%date% %time%] Starting copy... >> "%LOGFILE%"
set COPIED=0
for /L %%i in (1,1,5) do (
if !COPIED!==0 (
echo [%date% %time%] Copy attempt %%i... >> "%LOGFILE%"
copy /Y /B "%SRC%" "%DST%" >> "%LOGFILE%" 2>&1
if exist "%DST%" (
for %%F in ("%DST%") do set DST_SIZE=%%~zF
echo [%date% %time%] DST size after copy: !DST_SIZE! >> "%LOGFILE%"
if "!DST_SIZE!"=="%EXPECTED_SIZE%" (
echo [%date% %time%] Size verified OK >> "%LOGFILE%"
set COPIED=1
) else (
echo [%date% %time%] Size mismatch! Expected %EXPECTED_SIZE%, got !DST_SIZE! >> "%LOGFILE%"
del /f /q "%DST%" >nul 2>&1
timeout /t 2 /nobreak >nul
)
) else (
echo [%date% %time%] Copy failed - DST does not exist >> "%LOGFILE%"
timeout /t 2 /nobreak >nul
)
)
)
if %COPIED%==0 (
echo [%date% %time%] FAILED: could not copy after 5 attempts >> "%LOGFILE%"
pause
goto cleanup
)
echo [%date% %time%] Copy successful, launching... >> "%LOGFILE%"
timeout /t 2 /nobreak >nul
rem Launch new exe
echo [%date% %time%] Starting: %DST% >> "%LOGFILE%"
start "" "%DST%"
echo [%date% %time%] Launch command issued >> "%LOGFILE%"
timeout /t 2 /nobreak >nul
:cleanup
rem Delete downloaded update file
del /f /q "%SRC%" >nul 2>&1
echo [%date% %time%] Cleanup done >> "%LOGFILE%"
rem Self-delete
del /f /q "%~f0" >nul 2>&1
"""
with open(bat_path, "w", encoding="utf-8") as f:
f.write(bat_content)
log.info(f"Update BAT: {bat_path}, log: {log_path}")
# Launch BAT minimized
subprocess.Popen(
["cmd.exe", "/c", "start", "/min", "", bat_path],
creationflags=_SUBPROCESS_FLAGS,
)
return True
except Exception as e:
log.error(f"Windows update failed: {e}")
return False
def _apply_linux(self, new_exe: str, current_exe: str) -> bool:
"""Linux update: replace in-place and exec."""
try:
os.replace(new_exe, current_exe)
os.chmod(current_exe, 0o755)
os.execv(current_exe, sys.argv)
return True # won't reach here
except Exception as e:
log.error(f"Linux update failed: {e}")
return False
def _loop(self):
"""Background loop — check periodically."""
# Initial delay: 10 seconds after startup
for _ in range(100):
if not self._running:
return
time.sleep(0.1)
while self._running:
# Check if enough time passed since last check
last_check = self.store.get_last_update_check()
now = time.time()
if not last_check or (now - last_check) >= _CHECK_INTERVAL:
info = self.check_now()
if info and self._gui_callback:
mode = self.store.get_update_mode()
if mode == "full-auto":
# Auto-download and apply
path = self.download_update(info["url"])
if path:
try:
self._gui_callback("auto_apply", info, path)
except Exception:
pass
elif mode == "auto-download":
# Download in background, then notify
path = self.download_update(info["url"])
if path:
try:
self._gui_callback("downloaded", info, path)
except Exception:
pass
else:
try:
self._gui_callback("available", info, None)
except Exception:
pass
else:
# notify-only
try:
self._gui_callback("available", info, None)
except Exception:
pass
# Sleep for 5 minutes, checking _running flag
for _ in range(3000):
if not self._running:
return
time.sleep(0.1)