533 lines
18 KiB
Python
533 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Gemini Code Patcher — patches Gemini CLI to route through custom AI proxy.
|
|
|
|
Targets:
|
|
1. gemini_base_url — hardcoded generativelanguage.googleapis.com → proxy
|
|
2. vertex_base_url — hardcoded aiplatform.googleapis.com → proxy
|
|
3. sanitize_env_url — allow : in URL env vars
|
|
4. auth_env_whitelist — add GOOGLE_GEMINI_BASE_URL to whitelist
|
|
5. user_settings — settings.json (auth + telemetry)
|
|
6. system_env — env vars injection
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import shutil
|
|
import platform
|
|
import subprocess
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
# ─── Constants ──────────────────────────────────────────────────────────
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
CONFIG_PATH = SCRIPT_DIR / "gemini_config.json"
|
|
|
|
IS_WINDOWS = platform.system() == "Windows"
|
|
IS_MACOS = platform.system() == "Darwin"
|
|
|
|
NPM_PACKAGE = "@google/gemini-cli"
|
|
GENAI_SUBPATH = "node_modules/@google/genai/dist/node/index.mjs"
|
|
SETTINGS_JS_SUBPATH = "dist/src/config/settings.js"
|
|
|
|
# ANSI colors
|
|
GREEN = "\033[92m"
|
|
YELLOW = "\033[93m"
|
|
RED = "\033[91m"
|
|
CYAN = "\033[96m"
|
|
BOLD = "\033[1m"
|
|
RESET = "\033[0m"
|
|
|
|
PATCH_MARKER = "/* GEMINI_PATCHED */"
|
|
|
|
|
|
# ─── Utilities ──────────────────────────────────────────────────────────
|
|
|
|
def eprint(*args, **kwargs):
|
|
print(*args, file=sys.stderr, **kwargs)
|
|
|
|
|
|
def color(text, c):
|
|
return f"{c}{text}{RESET}"
|
|
|
|
|
|
# ─── Detection ──────────────────────────────────────────────────────────
|
|
|
|
def default_gemini_paths():
|
|
"""Return list of possible Gemini CLI install paths."""
|
|
paths = []
|
|
if IS_WINDOWS:
|
|
appdata = os.environ.get("APPDATA", "")
|
|
if appdata:
|
|
paths.append(os.path.join(appdata, "npm", "node_modules", NPM_PACKAGE))
|
|
localappdata = os.environ.get("LOCALAPPDATA", "")
|
|
if localappdata:
|
|
paths.append(os.path.join(localappdata, "npm", "node_modules", NPM_PACKAGE))
|
|
elif IS_MACOS:
|
|
paths.extend([
|
|
f"/opt/homebrew/lib/node_modules/{NPM_PACKAGE}",
|
|
f"/usr/local/lib/node_modules/{NPM_PACKAGE}",
|
|
f"/usr/lib/node_modules/{NPM_PACKAGE}",
|
|
])
|
|
else:
|
|
paths.extend([
|
|
f"/usr/lib/node_modules/{NPM_PACKAGE}",
|
|
f"/usr/local/lib/node_modules/{NPM_PACKAGE}",
|
|
])
|
|
return paths
|
|
|
|
|
|
def detect_gemini():
|
|
"""Find Gemini CLI installation. Returns (gemini_root, genai_mjs, settings_js) or raises."""
|
|
for root in default_gemini_paths():
|
|
genai_mjs = os.path.join(root, GENAI_SUBPATH)
|
|
settings_js = os.path.join(root, SETTINGS_JS_SUBPATH)
|
|
if os.path.isfile(genai_mjs):
|
|
return root, genai_mjs, settings_js
|
|
raise FileNotFoundError(
|
|
f"Gemini CLI not found. Checked: {default_gemini_paths()}"
|
|
)
|
|
|
|
|
|
def read_version(gemini_root):
|
|
"""Read Gemini CLI version from package.json."""
|
|
pkg_path = os.path.join(gemini_root, "package.json")
|
|
if not os.path.isfile(pkg_path):
|
|
return "unknown"
|
|
with open(pkg_path, "r") as f:
|
|
data = json.load(f)
|
|
return data.get("version", "unknown")
|
|
|
|
|
|
# ─── Config ─────────────────────────────────────────────────────────────
|
|
|
|
def load_config(config_path=None):
|
|
"""Load gemini_config.json."""
|
|
path = config_path or CONFIG_PATH
|
|
if not os.path.isfile(path):
|
|
raise FileNotFoundError(f"Config not found: {path}")
|
|
with open(path, "r") as f:
|
|
return json.load(f)
|
|
|
|
|
|
# ─── Target 1+2: Patch genai URLs ──────────────────────────────────────
|
|
|
|
def patch_genai_urls(genai_mjs_path, config):
|
|
"""
|
|
Patch hardcoded API URLs in @google/genai/dist/node/index.mjs.
|
|
Target 1: generativelanguage.googleapis.com → proxy
|
|
Target 2: aiplatform.googleapis.com → proxy
|
|
"""
|
|
if not os.path.isfile(genai_mjs_path):
|
|
return False, f"File not found: {genai_mjs_path}"
|
|
|
|
with open(genai_mjs_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
base_url = config["base_url"].rstrip("/") + "/"
|
|
changes = 0
|
|
original = content
|
|
|
|
# Check if already patched
|
|
if PATCH_MARKER in content:
|
|
return True, "Already patched (marker found)"
|
|
|
|
# Target 1: Gemini API base URL
|
|
pattern1 = r'(initHttpOptions\.baseUrl\s*=\s*)`https://generativelanguage\.googleapis\.com/`'
|
|
replacement1 = rf'\1`{base_url}`'
|
|
content, n = re.subn(pattern1, replacement1, content)
|
|
changes += n
|
|
|
|
# Target 2a: Regional Vertex AI endpoint
|
|
pattern2a = r"return\s*`https://\$\{this\.clientOptions\.location\}-aiplatform\.googleapis\.com/`"
|
|
replacement2a = f"return `{base_url}`"
|
|
content, n = re.subn(pattern2a, replacement2a, content)
|
|
changes += n
|
|
|
|
# Target 2b: Global Vertex AI endpoint
|
|
pattern2b = r"return\s*`https://aiplatform\.googleapis\.com/`"
|
|
replacement2b = f"return `{base_url}`"
|
|
content, n = re.subn(pattern2b, replacement2b, content)
|
|
changes += n
|
|
|
|
# Target 1b: baseURL fallback (genai SDK >=1.30)
|
|
base_url_noslash = base_url.rstrip("/")
|
|
pattern1b = r'(baseURL:\s*baseURL\s*\|\|\s*)`https://generativelanguage\.googleapis\.com`'
|
|
replacement1b = rf'\1`{base_url_noslash}`'
|
|
content, n = re.subn(pattern1b, replacement1b, content)
|
|
changes += n
|
|
|
|
# Target 1c: baseURLOverridden default check (genai SDK >=1.30)
|
|
pattern1c = r"(this\.baseURL\s*!==\s*)'https://generativelanguage\.googleapis\.com'"
|
|
replacement1c = rf"\1'{base_url_noslash}'"
|
|
content, n = re.subn(pattern1c, replacement1c, content)
|
|
changes += n
|
|
|
|
# Catch-all: replace any remaining googleapis URLs in non-comment code
|
|
remaining = content.count("generativelanguage.googleapis.com")
|
|
if remaining > 0:
|
|
content = content.replace("generativelanguage.googleapis.com", base_url_noslash.replace("https://", "").replace("http://", ""))
|
|
changes += remaining
|
|
|
|
if changes == 0:
|
|
return False, "No URL patterns matched — structure may have changed"
|
|
|
|
# Add patch marker at the top
|
|
content = PATCH_MARKER + "\n" + content
|
|
|
|
# Write back
|
|
# Create backup first
|
|
backup_path = genai_mjs_path + ".backup"
|
|
if not os.path.exists(backup_path):
|
|
shutil.copy2(genai_mjs_path, backup_path)
|
|
|
|
with open(genai_mjs_path, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
return True, f"Patched {changes} URL(s) in genai index.mjs"
|
|
|
|
|
|
# ─── Target 3+4: Patch settings.js ─────────────────────────────────────
|
|
|
|
def patch_settings_js(settings_js_path):
|
|
"""
|
|
Target 3: Fix sanitizeEnvVar to allow : in URLs.
|
|
Target 4: Add GOOGLE_GEMINI_BASE_URL to AUTH_ENV_VAR_WHITELIST.
|
|
"""
|
|
if not os.path.isfile(settings_js_path):
|
|
return False, f"File not found: {settings_js_path}"
|
|
|
|
with open(settings_js_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
changes = 0
|
|
|
|
# Target 3: sanitizeEnvVar — allow : and @ in regex
|
|
old_sanitize = "return value.replace(/[^a-zA-Z0-9\\-_./]/g, '');"
|
|
new_sanitize = "return value.replace(/[^a-zA-Z0-9\\-_./:@]/g, '');"
|
|
if old_sanitize in content:
|
|
content = content.replace(old_sanitize, new_sanitize, 1)
|
|
changes += 1
|
|
|
|
# Target 4: AUTH_ENV_VAR_WHITELIST — add BASE_URL entries
|
|
if "GOOGLE_GEMINI_BASE_URL" not in content:
|
|
old_whitelist = "'GOOGLE_CLOUD_LOCATION',\n];"
|
|
new_whitelist = (
|
|
"'GOOGLE_CLOUD_LOCATION',\n"
|
|
" 'GOOGLE_GEMINI_BASE_URL',\n"
|
|
" 'GOOGLE_VERTEX_BASE_URL',\n"
|
|
"];"
|
|
)
|
|
if old_whitelist in content:
|
|
content = content.replace(old_whitelist, new_whitelist, 1)
|
|
changes += 1
|
|
else:
|
|
# Try alternate formatting
|
|
old_whitelist2 = "'GOOGLE_CLOUD_LOCATION',\n ];"
|
|
new_whitelist2 = (
|
|
"'GOOGLE_CLOUD_LOCATION',\n"
|
|
" 'GOOGLE_GEMINI_BASE_URL',\n"
|
|
" 'GOOGLE_VERTEX_BASE_URL',\n"
|
|
" ];"
|
|
)
|
|
if old_whitelist2 in content:
|
|
content = content.replace(old_whitelist2, new_whitelist2, 1)
|
|
changes += 1
|
|
|
|
if changes == 0:
|
|
return False, "No settings.js patterns matched"
|
|
|
|
# Backup
|
|
backup_path = settings_js_path + ".backup"
|
|
if not os.path.exists(backup_path):
|
|
shutil.copy2(settings_js_path, backup_path)
|
|
|
|
with open(settings_js_path, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
return True, f"Patched {changes} target(s) in settings.js"
|
|
|
|
|
|
# ─── Target 5: User settings.json ──────────────────────────────────────
|
|
|
|
def patch_user_settings(config, home_dir=None):
|
|
"""
|
|
Target 5: Configure ~/.gemini/settings.json with auth type and telemetry.
|
|
"""
|
|
if home_dir is None:
|
|
home_dir = os.path.expanduser("~")
|
|
|
|
gemini_dir = os.path.join(home_dir, ".gemini")
|
|
settings_path = os.path.join(gemini_dir, "settings.json")
|
|
|
|
# Ensure dir exists
|
|
os.makedirs(gemini_dir, exist_ok=True)
|
|
|
|
# Read existing settings
|
|
existing = {}
|
|
if os.path.isfile(settings_path):
|
|
try:
|
|
with open(settings_path, "r") as f:
|
|
existing = json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
existing = {}
|
|
|
|
# Deep merge our settings
|
|
if "security" not in existing:
|
|
existing["security"] = {}
|
|
if "auth" not in existing["security"]:
|
|
existing["security"]["auth"] = {}
|
|
existing["security"]["auth"]["selectedType"] = "gemini-api-key"
|
|
|
|
if "telemetry" not in existing:
|
|
existing["telemetry"] = {}
|
|
existing["telemetry"]["enabled"] = config.get("telemetry_enabled", False)
|
|
existing["telemetry"]["logPrompts"] = False
|
|
|
|
# Write back
|
|
with open(settings_path, "w") as f:
|
|
json.dump(existing, f, indent=2)
|
|
|
|
return True, f"Settings updated: {settings_path}"
|
|
|
|
|
|
# ─── Target 6: System env vars ─────────────────────────────────────────
|
|
|
|
def setup_env_vars(config):
|
|
"""
|
|
Target 6: Inject env vars into /etc/environment (Linux) or create wrapper.
|
|
"""
|
|
base_url = config["base_url"]
|
|
api_key = config["api_key"]
|
|
|
|
env_vars = {
|
|
"GEMINI_API_KEY": api_key,
|
|
"GOOGLE_GEMINI_BASE_URL": base_url,
|
|
}
|
|
|
|
if IS_WINDOWS:
|
|
return _setup_env_windows(env_vars)
|
|
|
|
# Linux/macOS: write to /etc/environment
|
|
env_file = "/etc/environment"
|
|
changes = 0
|
|
|
|
try:
|
|
existing_content = ""
|
|
if os.path.isfile(env_file):
|
|
with open(env_file, "r") as f:
|
|
existing_content = f.read()
|
|
|
|
lines = existing_content.splitlines()
|
|
new_lines = []
|
|
|
|
for line in lines:
|
|
# Remove old entries
|
|
key_match = False
|
|
for key in env_vars:
|
|
if line.startswith(f"{key}=") or line.startswith(f'export {key}='):
|
|
key_match = True
|
|
break
|
|
if not key_match:
|
|
new_lines.append(line)
|
|
|
|
# Add new entries
|
|
for key, value in env_vars.items():
|
|
new_lines.append(f'{key}="{value}"')
|
|
changes += 1
|
|
|
|
with open(env_file, "w") as f:
|
|
f.write("\n".join(new_lines) + "\n")
|
|
|
|
# Also export to current process
|
|
for key, value in env_vars.items():
|
|
os.environ[key] = value
|
|
|
|
return True, f"Set {changes} env var(s) in {env_file}"
|
|
|
|
except PermissionError:
|
|
# Fallback: just set for current process
|
|
for key, value in env_vars.items():
|
|
os.environ[key] = value
|
|
return True, f"Set {len(env_vars)} env var(s) in current process (no root access for {env_file})"
|
|
|
|
|
|
def _setup_env_windows(env_vars):
|
|
"""Set env vars on Windows via setx."""
|
|
changes = 0
|
|
for key, value in env_vars.items():
|
|
try:
|
|
subprocess.run(["setx", key, value], check=True, capture_output=True)
|
|
os.environ[key] = value
|
|
changes += 1
|
|
except Exception as e:
|
|
eprint(f"Warning: Failed to set {key}: {e}")
|
|
return changes > 0, f"Set {changes} env var(s) via setx"
|
|
|
|
|
|
# ─── Rollback ───────────────────────────────────────────────────────────
|
|
|
|
def rollback(genai_mjs_path, settings_js_path):
|
|
"""Restore backup files."""
|
|
restored = 0
|
|
for path in [genai_mjs_path, settings_js_path]:
|
|
backup = path + ".backup"
|
|
if os.path.exists(backup):
|
|
shutil.copy2(backup, path)
|
|
restored += 1
|
|
return restored > 0, f"Restored {restored} file(s) from backup"
|
|
|
|
|
|
# ─── Validation ─────────────────────────────────────────────────────────
|
|
|
|
def validate_installation(config):
|
|
"""Run basic validation: gemini --version and test prompt."""
|
|
results = []
|
|
|
|
# Check gemini exists
|
|
try:
|
|
result = subprocess.run(
|
|
["gemini", "--version"],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
version = result.stdout.strip() or result.stderr.strip()
|
|
results.append(("version_check", True, version))
|
|
except Exception as e:
|
|
results.append(("version_check", False, str(e)))
|
|
|
|
# Test prompt (needs env vars set)
|
|
try:
|
|
env = os.environ.copy()
|
|
env["GEMINI_API_KEY"] = config["api_key"]
|
|
env["GOOGLE_GEMINI_BASE_URL"] = config["base_url"]
|
|
|
|
result = subprocess.run(
|
|
["gemini", "-p", "Reply with just the word OK, nothing else"],
|
|
capture_output=True, text=True, timeout=60, env=env
|
|
)
|
|
output = result.stdout.strip()
|
|
success = "OK" in output.upper()
|
|
results.append(("prompt_test", success, output[:200]))
|
|
except Exception as e:
|
|
results.append(("prompt_test", False, str(e)))
|
|
|
|
return results
|
|
|
|
|
|
# ─── Main orchestrator ──────────────────────────────────────────────────
|
|
|
|
def apply_all_patches(config=None, settings_only=False):
|
|
"""Apply all patch targets."""
|
|
if config is None:
|
|
config = load_config()
|
|
|
|
gemini_root, genai_mjs, settings_js = detect_gemini()
|
|
version = read_version(gemini_root)
|
|
|
|
print(f"\n{BOLD}Gemini Code Patcher{RESET}")
|
|
print(f" Version: {color(version, CYAN)}")
|
|
print(f" Root: {gemini_root}")
|
|
print(f" Proxy: {config['base_url']}")
|
|
print()
|
|
|
|
results = {}
|
|
all_ok = True
|
|
|
|
if not settings_only:
|
|
# Target 1+2: genai URLs
|
|
ok, msg = patch_genai_urls(genai_mjs, config)
|
|
results["genai_urls"] = (ok, msg)
|
|
print(f" {'[OK]' if ok else '[FAIL]':>8} Target 1+2: {msg}")
|
|
if not ok:
|
|
all_ok = False
|
|
|
|
# Target 3+4: settings.js
|
|
ok, msg = patch_settings_js(settings_js)
|
|
results["settings_js"] = (ok, msg)
|
|
print(f" {'[OK]' if ok else '[FAIL]':>8} Target 3+4: {msg}")
|
|
if not ok:
|
|
all_ok = False
|
|
|
|
# Target 5: user settings
|
|
ok, msg = patch_user_settings(config)
|
|
results["user_settings"] = (ok, msg)
|
|
print(f" {'[OK]' if ok else '[FAIL]':>8} Target 5: {msg}")
|
|
if not ok:
|
|
all_ok = False
|
|
|
|
# Target 6: env vars
|
|
ok, msg = setup_env_vars(config)
|
|
results["env_vars"] = (ok, msg)
|
|
print(f" {'[OK]' if ok else '[FAIL]':>8} Target 6: {msg}")
|
|
if not ok:
|
|
all_ok = False
|
|
|
|
print()
|
|
if all_ok:
|
|
print(f" {color('All patches applied successfully!', GREEN)}")
|
|
else:
|
|
print(f" {color('Some patches failed. Check output above.', RED)}")
|
|
|
|
return all_ok, results
|
|
|
|
|
|
# ─── CLI ────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Gemini Code Patcher — route Gemini CLI through custom proxy"
|
|
)
|
|
parser.add_argument("--apply", action="store_true", help="Apply all patches")
|
|
parser.add_argument("--settings-only", action="store_true", help="Apply only settings.json + env vars")
|
|
parser.add_argument("--rollback", action="store_true", help="Restore from backup")
|
|
parser.add_argument("--detect", action="store_true", help="Detect Gemini CLI installation")
|
|
parser.add_argument("--validate", action="store_true", help="Validate installation")
|
|
parser.add_argument("--config", type=str, help="Path to config file")
|
|
args = parser.parse_args()
|
|
|
|
config = load_config(args.config)
|
|
|
|
if args.detect:
|
|
try:
|
|
root, genai, settings = detect_gemini()
|
|
version = read_version(root)
|
|
print(f"Gemini CLI found:")
|
|
print(f" Root: {root}")
|
|
print(f" Version: {version}")
|
|
print(f" GenAI MJS: {genai}")
|
|
print(f" Settings: {settings}")
|
|
except FileNotFoundError as e:
|
|
eprint(str(e))
|
|
sys.exit(1)
|
|
|
|
elif args.rollback:
|
|
try:
|
|
root, genai, settings = detect_gemini()
|
|
ok, msg = rollback(genai, settings)
|
|
print(msg)
|
|
sys.exit(0 if ok else 1)
|
|
except FileNotFoundError as e:
|
|
eprint(str(e))
|
|
sys.exit(1)
|
|
|
|
elif args.validate:
|
|
results = validate_installation(config)
|
|
for name, ok, detail in results:
|
|
status = color("[OK]", GREEN) if ok else color("[FAIL]", RED)
|
|
print(f" {status} {name}: {detail}")
|
|
sys.exit(0 if all(r[1] for r in results) else 1)
|
|
|
|
elif args.apply or args.settings_only:
|
|
ok, _ = apply_all_patches(config, settings_only=args.settings_only)
|
|
sys.exit(0 if ok else 1)
|
|
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|