feat(codex): add Codex CLI patcher — config+env based patching
Codex CLI is a Rust binary — patched via config.toml + env vars. 6 targets: api_endpoint, auth, telemetry, permissions, model, env. Includes installer, binary updater, config validator, pipeline CLI. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
19
codex/codex_config.example.json
Normal file
19
codex/codex_config.example.json
Normal file
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"base_url": "https://your-api-endpoint.example.com",
|
||||
"api_key": "YOUR_API_KEY",
|
||||
"model": "gpt-5.2-codex",
|
||||
"models": [
|
||||
"gpt-5.3-codex",
|
||||
"gpt-5.2-codex",
|
||||
"o3",
|
||||
"o4-mini"
|
||||
],
|
||||
"model_reasoning_effort": "high",
|
||||
"approval_policy": "never",
|
||||
"sandbox_mode": "danger-full-access",
|
||||
"wire_api": "responses",
|
||||
"telemetry_enabled": false,
|
||||
"check_for_update": false,
|
||||
"trust_paths": ["/home", "/root", "/tmp"],
|
||||
"target_version": "0.111.0"
|
||||
}
|
||||
19
codex/codex_config.json
Normal file
19
codex/codex_config.json
Normal file
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"base_url": "https://ai.37-187-136-86.sslip.io",
|
||||
"api_key": "ClauderAPI",
|
||||
"model": "gpt-5.2-codex",
|
||||
"models": [
|
||||
"gpt-5.3-codex",
|
||||
"gpt-5.2-codex",
|
||||
"o3",
|
||||
"o4-mini"
|
||||
],
|
||||
"model_reasoning_effort": "high",
|
||||
"approval_policy": "never",
|
||||
"sandbox_mode": "danger-full-access",
|
||||
"wire_api": "responses",
|
||||
"telemetry_enabled": false,
|
||||
"check_for_update": false,
|
||||
"trust_paths": ["/home", "/root", "/tmp"],
|
||||
"target_version": "0.111.0"
|
||||
}
|
||||
619
codex/codex_patcher.py
Normal file
619
codex/codex_patcher.py
Normal file
@@ -0,0 +1,619 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Codex CLI Patcher — config+env patching for Codex CLI (Rust binary).
|
||||
|
||||
Unlike Claude Code and Gemini CLI (JavaScript), Codex is a compiled Rust binary.
|
||||
Patching is done via config.toml manipulation and environment variable injection.
|
||||
|
||||
Targets:
|
||||
1. api_endpoint — redirect API to custom proxy via model_providers
|
||||
2. authentication — API key auth via codex login
|
||||
3. telemetry — disable analytics
|
||||
4. permissions — bypass approvals + sandbox
|
||||
5. model_config — model, reasoning, disable auto-update
|
||||
6. system_env — /etc/environment vars
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import shutil
|
||||
import platform
|
||||
import subprocess
|
||||
import argparse
|
||||
import tomllib
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
|
||||
# ─── Constants ──────────────────────────────────────────────────────────
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
CONFIG_PATH = SCRIPT_DIR / "codex_config.json"
|
||||
|
||||
IS_WINDOWS = platform.system() == "Windows"
|
||||
IS_MACOS = platform.system() == "Darwin"
|
||||
|
||||
# ANSI colors
|
||||
GREEN = "\033[92m"
|
||||
YELLOW = "\033[93m"
|
||||
RED = "\033[91m"
|
||||
CYAN = "\033[96m"
|
||||
BOLD = "\033[1m"
|
||||
RESET = "\033[0m"
|
||||
|
||||
# Managed config keys (we update these, preserve everything else)
|
||||
MANAGED_TOP_KEYS = {
|
||||
"model", "model_reasoning_effort", "model_provider",
|
||||
"approval_policy", "sandbox_mode",
|
||||
"check_for_update_on_startup", "forced_login_method",
|
||||
}
|
||||
MANAGED_SECTIONS = {"analytics", "model_providers"}
|
||||
|
||||
|
||||
# ─── Config Loading ─────────────────────────────────────────────────────
|
||||
|
||||
def load_config(config_path=None):
|
||||
"""Load codex_config.json."""
|
||||
path = Path(config_path) if config_path else CONFIG_PATH
|
||||
if not path.is_file():
|
||||
print(f"{RED}Config not found: {path}{RESET}")
|
||||
sys.exit(1)
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
# ─── Detection ──────────────────────────────────────────────────────────
|
||||
|
||||
def detect_codex():
|
||||
"""Find codex binary. Returns (binary_path, version) or exits."""
|
||||
# Try which/where
|
||||
cmd = "where" if IS_WINDOWS else "which"
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[cmd, "codex"], capture_output=True, text=True, timeout=5
|
||||
)
|
||||
binary_path = result.stdout.strip().split("\n")[0] if result.returncode == 0 else None
|
||||
except Exception:
|
||||
binary_path = None
|
||||
|
||||
if not binary_path:
|
||||
# Common fallback paths
|
||||
for p in ["/usr/local/bin/codex", "/usr/bin/codex"]:
|
||||
if os.path.isfile(p):
|
||||
binary_path = p
|
||||
break
|
||||
|
||||
if not binary_path:
|
||||
print(f"{RED}Codex CLI not found. Install: https://github.com/openai/codex{RESET}")
|
||||
sys.exit(1)
|
||||
|
||||
# Get version
|
||||
version = "unknown"
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[binary_path, "--version"], capture_output=True, text=True, timeout=10
|
||||
)
|
||||
# Output: "codex-cli 0.111.0"
|
||||
if result.returncode == 0:
|
||||
parts = result.stdout.strip().split()
|
||||
if len(parts) >= 2:
|
||||
version = parts[-1]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return binary_path, version
|
||||
|
||||
|
||||
# ─── TOML Read/Write ────────────────────────────────────────────────────
|
||||
|
||||
def read_toml(path):
|
||||
"""Read TOML file. Returns dict or empty dict if not found."""
|
||||
if not os.path.isfile(path):
|
||||
return {}
|
||||
with open(path, "rb") as f:
|
||||
return tomllib.load(f)
|
||||
|
||||
|
||||
def read_toml_raw(path):
|
||||
"""Read TOML file as raw text. Returns string or empty string."""
|
||||
if not os.path.isfile(path):
|
||||
return ""
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def toml_value(v):
|
||||
"""Format a Python value as TOML."""
|
||||
if isinstance(v, bool):
|
||||
return "true" if v else "false"
|
||||
if isinstance(v, str):
|
||||
return f'"{v}"'
|
||||
if isinstance(v, (int, float)):
|
||||
return str(v)
|
||||
if isinstance(v, list):
|
||||
items = ", ".join(toml_value(i) for i in v)
|
||||
return f"[{items}]"
|
||||
return str(v)
|
||||
|
||||
|
||||
def generate_config_toml(existing, config):
|
||||
"""Generate config.toml content, merging with existing user config.
|
||||
|
||||
Strategy:
|
||||
- Update managed top-level keys
|
||||
- Update managed sections ([analytics], [model_providers.custom])
|
||||
- Add trust paths to [projects.*]
|
||||
- Preserve all other user-defined content
|
||||
"""
|
||||
lines = ["# Codex CLI Configuration (managed by codex_patcher.py)"]
|
||||
|
||||
# Top-level managed keys
|
||||
lines.append(f'model = "{config["model"]}"')
|
||||
lines.append(f'model_reasoning_effort = "{config.get("model_reasoning_effort", "high")}"')
|
||||
lines.append('model_provider = "custom"')
|
||||
lines.append(f'approval_policy = "{config.get("approval_policy", "never")}"')
|
||||
lines.append(f'sandbox_mode = "{config.get("sandbox_mode", "danger-full-access")}"')
|
||||
lines.append(f'check_for_update_on_startup = {toml_value(config.get("check_for_update", False))}')
|
||||
lines.append('forced_login_method = "api"')
|
||||
|
||||
# Preserve existing top-level keys we don't manage
|
||||
for key, val in existing.items():
|
||||
if key not in MANAGED_TOP_KEYS and not isinstance(val, dict):
|
||||
lines.append(f"{key} = {toml_value(val)}")
|
||||
|
||||
# [analytics]
|
||||
lines.append("")
|
||||
lines.append("[analytics]")
|
||||
lines.append(f"enabled = {toml_value(config.get('telemetry_enabled', False))}")
|
||||
|
||||
# [model_providers.custom]
|
||||
base_url = config["base_url"].rstrip("/")
|
||||
if not base_url.endswith("/v1"):
|
||||
base_url += "/v1"
|
||||
lines.append("")
|
||||
lines.append("[model_providers.custom]")
|
||||
lines.append('name = "custom"')
|
||||
lines.append(f'base_url = "{base_url}"')
|
||||
lines.append(f'env_key = "OPENAI_API_KEY"')
|
||||
lines.append(f'wire_api = "{config.get("wire_api", "responses")}"')
|
||||
|
||||
# Preserve other model_providers
|
||||
mp = existing.get("model_providers", {})
|
||||
if isinstance(mp, dict):
|
||||
for name, provider in mp.items():
|
||||
if name == "custom":
|
||||
continue
|
||||
lines.append("")
|
||||
lines.append(f"[model_providers.{name}]")
|
||||
for k, v in provider.items():
|
||||
lines.append(f"{k} = {toml_value(v)}")
|
||||
|
||||
# Trust paths
|
||||
trust_paths = config.get("trust_paths", ["/home", "/root", "/tmp"])
|
||||
existing_projects = existing.get("projects", {})
|
||||
|
||||
# Add our trust paths
|
||||
for tp in trust_paths:
|
||||
lines.append("")
|
||||
lines.append(f'[projects."{tp}"]')
|
||||
lines.append('trust_level = "trusted"')
|
||||
|
||||
# Preserve user's existing project trust entries (that aren't in our list)
|
||||
for path, proj_conf in existing_projects.items():
|
||||
if path not in trust_paths and isinstance(proj_conf, dict):
|
||||
lines.append("")
|
||||
lines.append(f'[projects."{path}"]')
|
||||
for k, v in proj_conf.items():
|
||||
lines.append(f"{k} = {toml_value(v)}")
|
||||
|
||||
# Preserve other sections we don't manage
|
||||
skip_sections = {"analytics", "model_providers", "projects"}
|
||||
for key, val in existing.items():
|
||||
if key in skip_sections or key in MANAGED_TOP_KEYS:
|
||||
continue
|
||||
if isinstance(val, dict):
|
||||
lines.append("")
|
||||
lines.append(f"[{key}]")
|
||||
for k, v in val.items():
|
||||
if isinstance(v, dict):
|
||||
# Nested table
|
||||
lines.append("")
|
||||
lines.append(f"[{key}.{k}]")
|
||||
for kk, vv in v.items():
|
||||
lines.append(f"{kk} = {toml_value(vv)}")
|
||||
else:
|
||||
lines.append(f"{k} = {toml_value(v)}")
|
||||
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
# ─── Backup ─────────────────────────────────────────────────────────────
|
||||
|
||||
def backup_file(path):
|
||||
"""Create timestamped backup. Returns backup path or None."""
|
||||
if not os.path.isfile(path):
|
||||
return None
|
||||
ts = datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
backup = f"{path}.backup.{ts}"
|
||||
shutil.copy2(path, backup)
|
||||
return backup
|
||||
|
||||
|
||||
# ─── Target 1: API Endpoint ─────────────────────────────────────────────
|
||||
|
||||
def patch_api_endpoint(codex_dir, config):
|
||||
"""Target 1: Configure [model_providers.custom] in config.toml."""
|
||||
config_path = os.path.join(codex_dir, "config.toml")
|
||||
existing = read_toml(config_path)
|
||||
|
||||
base_url = config["base_url"].rstrip("/")
|
||||
if not base_url.endswith("/v1"):
|
||||
base_url += "/v1"
|
||||
|
||||
# Check if already configured
|
||||
mp = existing.get("model_providers", {})
|
||||
custom = mp.get("custom", {}) if isinstance(mp, dict) else {}
|
||||
if (custom.get("base_url") == base_url and
|
||||
existing.get("model_provider") == "custom"):
|
||||
return True, "Already configured"
|
||||
|
||||
return True, f"Will set base_url={base_url}"
|
||||
|
||||
|
||||
# ─── Target 2: Authentication ───────────────────────────────────────────
|
||||
|
||||
def patch_auth(config, home_dir=None):
|
||||
"""Target 2: Configure API key auth via codex login --with-api-key."""
|
||||
api_key = config["api_key"]
|
||||
messages = []
|
||||
|
||||
# Set env var for current process
|
||||
os.environ["OPENAI_API_KEY"] = api_key
|
||||
|
||||
# Run codex login --with-api-key
|
||||
try:
|
||||
env = os.environ.copy()
|
||||
env["OPENAI_API_KEY"] = api_key
|
||||
result = subprocess.run(
|
||||
["codex", "login", "--with-api-key"],
|
||||
input=api_key + "\n",
|
||||
capture_output=True, text=True, timeout=30, env=env
|
||||
)
|
||||
if result.returncode == 0:
|
||||
messages.append("codex login: ok")
|
||||
else:
|
||||
# May already be logged in or other issue
|
||||
stderr = result.stderr.strip()
|
||||
if "already" in stderr.lower():
|
||||
messages.append("codex login: already authenticated")
|
||||
else:
|
||||
messages.append(f"codex login: exit {result.returncode}")
|
||||
except subprocess.TimeoutExpired:
|
||||
messages.append("codex login: timeout (30s)")
|
||||
except FileNotFoundError:
|
||||
messages.append("codex login: binary not found")
|
||||
except Exception as e:
|
||||
messages.append(f"codex login: {e}")
|
||||
|
||||
return True, "; ".join(messages)
|
||||
|
||||
|
||||
# ─── Target 3: Telemetry ────────────────────────────────────────────────
|
||||
|
||||
def patch_telemetry(codex_dir, config):
|
||||
"""Target 3: Disable analytics in config.toml."""
|
||||
config_path = os.path.join(codex_dir, "config.toml")
|
||||
existing = read_toml(config_path)
|
||||
|
||||
analytics = existing.get("analytics", {})
|
||||
if isinstance(analytics, dict) and analytics.get("enabled") is False:
|
||||
return True, "Already disabled"
|
||||
|
||||
return True, "Will disable analytics"
|
||||
|
||||
|
||||
# ─── Target 4: Permissions ──────────────────────────────────────────────
|
||||
|
||||
def patch_permissions(codex_dir, config):
|
||||
"""Target 4: Set approval_policy=never, sandbox=danger-full-access."""
|
||||
config_path = os.path.join(codex_dir, "config.toml")
|
||||
existing = read_toml(config_path)
|
||||
|
||||
policy = config.get("approval_policy", "never")
|
||||
sandbox = config.get("sandbox_mode", "danger-full-access")
|
||||
|
||||
if (existing.get("approval_policy") == policy and
|
||||
existing.get("sandbox_mode") == sandbox):
|
||||
return True, "Already configured"
|
||||
|
||||
return True, f"Will set approval={policy}, sandbox={sandbox}"
|
||||
|
||||
|
||||
# ─── Target 5: Model Config ─────────────────────────────────────────────
|
||||
|
||||
def patch_model_config(codex_dir, config):
|
||||
"""Target 5: Set model, reasoning_effort, disable auto-update."""
|
||||
config_path = os.path.join(codex_dir, "config.toml")
|
||||
existing = read_toml(config_path)
|
||||
|
||||
model = config["model"]
|
||||
effort = config.get("model_reasoning_effort", "high")
|
||||
|
||||
if (existing.get("model") == model and
|
||||
existing.get("model_reasoning_effort") == effort and
|
||||
existing.get("check_for_update_on_startup") is False):
|
||||
return True, "Already configured"
|
||||
|
||||
return True, f"Will set model={model}, effort={effort}"
|
||||
|
||||
|
||||
# ─── Target 6: System Environment ───────────────────────────────────────
|
||||
|
||||
def setup_env_vars(config):
|
||||
"""Target 6: Set OPENAI_BASE_URL and OPENAI_API_KEY in /etc/environment."""
|
||||
base_url = config["base_url"].rstrip("/")
|
||||
if not base_url.endswith("/v1"):
|
||||
base_url += "/v1"
|
||||
|
||||
env_vars = {
|
||||
"OPENAI_BASE_URL": base_url,
|
||||
"OPENAI_API_KEY": config["api_key"],
|
||||
}
|
||||
|
||||
if IS_WINDOWS:
|
||||
# Use setx for Windows
|
||||
count = 0
|
||||
for key, val in env_vars.items():
|
||||
try:
|
||||
subprocess.run(
|
||||
["setx", key, val, "/M"],
|
||||
capture_output=True, timeout=10
|
||||
)
|
||||
count += 1
|
||||
except Exception:
|
||||
pass
|
||||
return count > 0, f"Set {count} env var(s) via setx"
|
||||
|
||||
# Linux/macOS: /etc/environment
|
||||
etc_env = "/etc/environment"
|
||||
try:
|
||||
content = ""
|
||||
if os.path.isfile(etc_env):
|
||||
with open(etc_env, "r") as f:
|
||||
content = f.read()
|
||||
|
||||
changed = False
|
||||
for key, val in env_vars.items():
|
||||
line = f'{key}="{val}"'
|
||||
if key in content:
|
||||
# Update existing
|
||||
new_lines = []
|
||||
for l in content.split("\n"):
|
||||
if l.startswith(f"{key}="):
|
||||
if l != line:
|
||||
new_lines.append(line)
|
||||
changed = True
|
||||
else:
|
||||
new_lines.append(l)
|
||||
else:
|
||||
new_lines.append(l)
|
||||
content = "\n".join(new_lines)
|
||||
else:
|
||||
content = content.rstrip("\n") + "\n" + line + "\n"
|
||||
changed = True
|
||||
|
||||
if changed:
|
||||
with open(etc_env, "w") as f:
|
||||
f.write(content)
|
||||
return True, f"Set {len(env_vars)} env var(s) in {etc_env}"
|
||||
else:
|
||||
return True, "Env vars already set"
|
||||
|
||||
except PermissionError:
|
||||
return False, f"Permission denied: {etc_env} (run as root)"
|
||||
except Exception as e:
|
||||
return False, f"Error: {e}"
|
||||
|
||||
|
||||
# ─── Apply All Patches ──────────────────────────────────────────────────
|
||||
|
||||
def apply_all_patches(config, home_dir=None):
|
||||
"""Apply all 6 patch targets. Returns (all_ok, results_dict)."""
|
||||
if home_dir is None:
|
||||
home_dir = os.path.expanduser("~")
|
||||
|
||||
codex_dir = os.path.join(home_dir, ".codex")
|
||||
os.makedirs(codex_dir, exist_ok=True)
|
||||
|
||||
config_path = os.path.join(codex_dir, "config.toml")
|
||||
results = {}
|
||||
all_ok = True
|
||||
|
||||
binary_path, version = detect_codex()
|
||||
print(f"\n{BOLD}Codex CLI Patcher{RESET}")
|
||||
print(f" Version: {CYAN}{version}{RESET}")
|
||||
print(f" Binary: {binary_path}")
|
||||
print(f" Proxy: {config['base_url']}")
|
||||
print()
|
||||
|
||||
# Read existing config
|
||||
existing = read_toml(config_path)
|
||||
|
||||
# Backup before any changes
|
||||
backup_file(config_path)
|
||||
|
||||
# Generate new config.toml (merge)
|
||||
new_content = generate_config_toml(existing, config)
|
||||
|
||||
# Write config.toml
|
||||
with open(config_path, "w", encoding="utf-8") as f:
|
||||
f.write(new_content)
|
||||
|
||||
# Target 1: API endpoint
|
||||
ok, msg = patch_api_endpoint(codex_dir, config)
|
||||
results["api_endpoint"] = (ok, msg)
|
||||
print(f" {'[OK]' if ok else '[FAIL]':>8} Target 1: {msg}")
|
||||
if not ok:
|
||||
all_ok = False
|
||||
|
||||
# Target 2: Authentication
|
||||
ok, msg = patch_auth(config, home_dir)
|
||||
results["authentication"] = (ok, msg)
|
||||
print(f" {'[OK]' if ok else '[FAIL]':>8} Target 2: {msg}")
|
||||
if not ok:
|
||||
all_ok = False
|
||||
|
||||
# Target 3: Telemetry
|
||||
ok, msg = patch_telemetry(codex_dir, config)
|
||||
results["telemetry"] = (ok, msg)
|
||||
print(f" {'[OK]' if ok else '[FAIL]':>8} Target 3: {msg}")
|
||||
if not ok:
|
||||
all_ok = False
|
||||
|
||||
# Target 4: Permissions
|
||||
ok, msg = patch_permissions(codex_dir, config)
|
||||
results["permissions"] = (ok, msg)
|
||||
print(f" {'[OK]' if ok else '[FAIL]':>8} Target 4: {msg}")
|
||||
if not ok:
|
||||
all_ok = False
|
||||
|
||||
# Target 5: Model config
|
||||
ok, msg = patch_model_config(codex_dir, config)
|
||||
results["model_config"] = (ok, msg)
|
||||
print(f" {'[OK]' if ok else '[FAIL]':>8} Target 5: {msg}")
|
||||
if not ok:
|
||||
all_ok = False
|
||||
|
||||
# Target 6: System env
|
||||
ok, msg = setup_env_vars(config)
|
||||
results["system_env"] = (ok, msg)
|
||||
print(f" {'[OK]' if ok else '[FAIL]':>8} Target 6: {msg}")
|
||||
if not ok:
|
||||
all_ok = False
|
||||
|
||||
print()
|
||||
if all_ok:
|
||||
print(f" {GREEN}All patches applied successfully!{RESET}")
|
||||
else:
|
||||
print(f" {RED}Some patches failed. Check output above.{RESET}")
|
||||
|
||||
return all_ok, results
|
||||
|
||||
|
||||
# ─── Rollback ────────────────────────────────────────────────────────────
|
||||
|
||||
def rollback(home_dir=None):
|
||||
"""Restore config.toml from latest backup."""
|
||||
if home_dir is None:
|
||||
home_dir = os.path.expanduser("~")
|
||||
|
||||
codex_dir = os.path.join(home_dir, ".codex")
|
||||
config_path = os.path.join(codex_dir, "config.toml")
|
||||
|
||||
# Find latest backup
|
||||
backups = sorted(Path(codex_dir).glob("config.toml.backup.*"), reverse=True)
|
||||
if not backups:
|
||||
print(f"{RED}No backups found in {codex_dir}{RESET}")
|
||||
return False
|
||||
|
||||
latest = backups[0]
|
||||
shutil.copy2(str(latest), config_path)
|
||||
print(f"{GREEN}Restored from {latest.name}{RESET}")
|
||||
return True
|
||||
|
||||
|
||||
# ─── Multi-User Support ─────────────────────────────────────────────────
|
||||
|
||||
def list_users():
|
||||
"""List system users with .codex/ or home dirs."""
|
||||
users = []
|
||||
try:
|
||||
import pwd
|
||||
for pw in pwd.getpwall():
|
||||
home = pw.pw_dir
|
||||
if not os.path.isdir(home):
|
||||
continue
|
||||
if pw.pw_uid < 1000 and pw.pw_uid != 0:
|
||||
continue
|
||||
if pw.pw_shell in ("/usr/sbin/nologin", "/bin/false"):
|
||||
continue
|
||||
users.append(pw)
|
||||
except ImportError:
|
||||
pass
|
||||
return users
|
||||
|
||||
|
||||
def patch_user(user_home, config):
|
||||
"""Patch a single user's ~/.codex/ config."""
|
||||
codex_dir = os.path.join(user_home, ".codex")
|
||||
os.makedirs(codex_dir, exist_ok=True)
|
||||
|
||||
config_path = os.path.join(codex_dir, "config.toml")
|
||||
existing = read_toml(config_path)
|
||||
backup_file(config_path)
|
||||
|
||||
new_content = generate_config_toml(existing, config)
|
||||
with open(config_path, "w", encoding="utf-8") as f:
|
||||
f.write(new_content)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# ─── CLI ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Codex CLI Patcher — route Codex through custom AI proxy"
|
||||
)
|
||||
parser.add_argument("--apply", action="store_true", help="Apply all patches")
|
||||
parser.add_argument("--all", action="store_true", help="Patch all user accounts")
|
||||
parser.add_argument("--rollback", action="store_true", help="Restore from backup")
|
||||
parser.add_argument("--detect", action="store_true", help="Detect Codex installation")
|
||||
parser.add_argument("--validate", action="store_true", help="Validate config state")
|
||||
parser.add_argument("--config", type=str, help="Path to codex_config.json")
|
||||
parser.add_argument("--yes", action="store_true", help="Non-interactive mode")
|
||||
args = parser.parse_args()
|
||||
|
||||
config = load_config(args.config)
|
||||
|
||||
if args.detect:
|
||||
binary_path, version = detect_codex()
|
||||
print(f"Binary: {binary_path}")
|
||||
print(f"Version: {version}")
|
||||
return 0
|
||||
|
||||
if args.rollback:
|
||||
return 0 if rollback() else 1
|
||||
|
||||
if args.validate:
|
||||
# Import validator
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
from updater.config_validator import validate_all, print_validation_report
|
||||
codex_dir = os.path.expanduser("~/.codex")
|
||||
results = validate_all(codex_dir, config)
|
||||
print_validation_report(results)
|
||||
return 0 if all(r[1] == "GREEN" for r in results) else 1
|
||||
|
||||
if args.apply:
|
||||
# Apply for current user
|
||||
ok, results = apply_all_patches(config)
|
||||
|
||||
# Patch other users if --all
|
||||
if args.all:
|
||||
for user in list_users():
|
||||
if user.pw_dir == os.path.expanduser("~"):
|
||||
continue
|
||||
try:
|
||||
patch_user(user.pw_dir, config)
|
||||
print(f" Patched {user.pw_name}: {user.pw_dir}/.codex/config.toml")
|
||||
except Exception as e:
|
||||
print(f" {RED}Failed {user.pw_name}: {e}{RESET}")
|
||||
|
||||
return 0 if ok else 1
|
||||
|
||||
parser.print_help()
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
63
codex/ucodex_install.sh
Executable file
63
codex/ucodex_install.sh
Executable file
@@ -0,0 +1,63 @@
|
||||
#!/bin/bash
|
||||
# UnlimitedCoding — Codex CLI Installer
|
||||
# Downloads Codex binary from GitHub + applies config patches
|
||||
#
|
||||
# Usage:
|
||||
# curl -fsSL https://git.sensey24.ru/.../ucodex_install.sh | sudo bash
|
||||
|
||||
set -e
|
||||
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
CYAN='\033[0;36m'
|
||||
BOLD='\033[1m'
|
||||
NC='\033[0m'
|
||||
|
||||
echo -e "${BOLD}=== UnlimitedCoding — Codex CLI Installer ===${NC}"
|
||||
|
||||
# Check prerequisites
|
||||
for cmd in python3 curl; do
|
||||
if ! command -v "$cmd" &>/dev/null; then
|
||||
echo -e "${RED}Error: $cmd is required but not found${NC}"
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
# Check Python version (need 3.11+ for tomllib)
|
||||
PY_VER=$(python3 -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
|
||||
PY_MAJOR=$(echo "$PY_VER" | cut -d. -f1)
|
||||
PY_MINOR=$(echo "$PY_VER" | cut -d. -f2)
|
||||
if [ "$PY_MAJOR" -lt 3 ] || ([ "$PY_MAJOR" -eq 3 ] && [ "$PY_MINOR" -lt 11 ]); then
|
||||
echo -e "${RED}Error: Python 3.11+ required (found $PY_VER)${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
|
||||
# Step 1: Install/update Codex binary
|
||||
echo -e "\n${BOLD}Step 1: Installing Codex CLI binary...${NC}"
|
||||
if [ -f "$SCRIPT_DIR/update-codex.sh" ]; then
|
||||
bash "$SCRIPT_DIR/update-codex.sh"
|
||||
else
|
||||
echo -e "${RED}update-codex.sh not found${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Step 2: Apply config patches
|
||||
echo -e "\n${BOLD}Step 2: Applying config patches...${NC}"
|
||||
if [ ! -f "$SCRIPT_DIR/codex_config.json" ]; then
|
||||
echo -e "${YELLOW}codex_config.json not found, copying example...${NC}"
|
||||
cp "$SCRIPT_DIR/codex_config.example.json" "$SCRIPT_DIR/codex_config.json"
|
||||
echo -e "${YELLOW}Edit codex_config.json with your API endpoint and key, then re-run.${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
python3 "$SCRIPT_DIR/codex_patcher.py" --apply --config "$SCRIPT_DIR/codex_config.json"
|
||||
|
||||
# Step 3: Validate
|
||||
echo -e "\n${BOLD}Step 3: Validating...${NC}"
|
||||
python3 "$SCRIPT_DIR/update_codex_patcher.py" --validate
|
||||
|
||||
echo -e "\n${GREEN}=== Installation complete! ===${NC}"
|
||||
echo -e "Run: ${CYAN}codex${NC} to start"
|
||||
185
codex/update-codex.sh
Executable file
185
codex/update-codex.sh
Executable file
@@ -0,0 +1,185 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
# Универсальный скрипт обновления OpenAI Codex CLI
|
||||
# Автоматически скачивает последнюю версию с GitHub Releases
|
||||
# Использует musl версию для совместимости со старыми системами
|
||||
#
|
||||
|
||||
set -e
|
||||
|
||||
# Цвета для вывода
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
BLUE='\033[0;34m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# Определяем путь к codex
|
||||
CODEX_PATH=$(which codex 2>/dev/null || echo "/usr/local/bin/codex")
|
||||
CODEX_DIR=$(dirname "$CODEX_PATH")
|
||||
TEMP_DIR="/tmp/codex-update-$$"
|
||||
GITHUB_API="https://api.github.com/repos/openai/codex/releases/latest"
|
||||
|
||||
# Определяем архитектуру - используем musl для совместимости
|
||||
ARCH=$(uname -m)
|
||||
case "$ARCH" in
|
||||
x86_64)
|
||||
BINARY_SUFFIX="x86_64-unknown-linux-musl"
|
||||
;;
|
||||
aarch64|arm64)
|
||||
BINARY_SUFFIX="aarch64-unknown-linux-musl"
|
||||
;;
|
||||
*)
|
||||
echo -e "${RED}Неподдерживаемая архитектура: $ARCH${NC}"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
echo -e "${BLUE}========================================${NC}"
|
||||
echo -e "${BLUE} OpenAI Codex CLI Updater${NC}"
|
||||
echo -e "${BLUE}========================================${NC}"
|
||||
echo ""
|
||||
|
||||
# Функция получения текущей версии
|
||||
get_current_version() {
|
||||
if command -v codex &> /dev/null; then
|
||||
local ver=$(codex --version 2>/dev/null | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1)
|
||||
if [ -n "$ver" ]; then
|
||||
echo "$ver"
|
||||
else
|
||||
echo "не работает"
|
||||
fi
|
||||
else
|
||||
echo "не установлен"
|
||||
fi
|
||||
}
|
||||
|
||||
# Функция получения последней версии с GitHub
|
||||
get_latest_version() {
|
||||
curl -s "$GITHUB_API" | grep -oP '"tag_name":\s*"rust-v\K[0-9]+\.[0-9]+\.[0-9]+' | head -1
|
||||
}
|
||||
|
||||
# Функция сравнения версий (возвращает 0 если нужно обновление)
|
||||
version_gt() {
|
||||
test "$(printf '%s\n' "$1" "$2" | sort -V | tail -n 1)" != "$2"
|
||||
}
|
||||
|
||||
# Получаем версии
|
||||
echo -e "${YELLOW}Проверка версий...${NC}"
|
||||
CURRENT_VERSION=$(get_current_version)
|
||||
echo -e "Текущая версия: ${BLUE}$CURRENT_VERSION${NC}"
|
||||
|
||||
LATEST_VERSION=$(get_latest_version)
|
||||
if [ -z "$LATEST_VERSION" ]; then
|
||||
echo -e "${RED}Не удалось получить информацию о последней версии${NC}"
|
||||
exit 1
|
||||
fi
|
||||
echo -e "Последняя версия: ${GREEN}$LATEST_VERSION${NC}"
|
||||
echo ""
|
||||
|
||||
# Проверяем нужно ли обновление
|
||||
if [ "$CURRENT_VERSION" = "$LATEST_VERSION" ]; then
|
||||
echo -e "${GREEN}✓ Codex уже обновлён до последней версии!${NC}"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Если текущая не работает или не установлена - всегда обновляем
|
||||
if [ "$CURRENT_VERSION" != "не установлен" ] && [ "$CURRENT_VERSION" != "не работает" ]; then
|
||||
if ! version_gt "$LATEST_VERSION" "$CURRENT_VERSION"; then
|
||||
echo -e "${GREEN}✓ Текущая версия актуальна или новее${NC}"
|
||||
exit 0
|
||||
fi
|
||||
fi
|
||||
|
||||
echo -e "${YELLOW}Требуется обновление: $CURRENT_VERSION → $LATEST_VERSION${NC}"
|
||||
echo ""
|
||||
|
||||
# Формируем URL для скачивания
|
||||
DOWNLOAD_URL="https://github.com/openai/codex/releases/download/rust-v${LATEST_VERSION}/codex-${BINARY_SUFFIX}.tar.gz"
|
||||
echo -e "${BLUE}Архитектура: $ARCH (${BINARY_SUFFIX})${NC}"
|
||||
echo -e "${BLUE}URL: $DOWNLOAD_URL${NC}"
|
||||
echo ""
|
||||
|
||||
# Создаём временную директорию
|
||||
mkdir -p "$TEMP_DIR"
|
||||
cd "$TEMP_DIR"
|
||||
|
||||
# Скачиваем
|
||||
echo -e "${YELLOW}Скачивание...${NC}"
|
||||
if ! curl -L -# -o codex.tar.gz "$DOWNLOAD_URL"; then
|
||||
echo -e "${RED}Ошибка скачивания${NC}"
|
||||
rm -rf "$TEMP_DIR"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Распаковываем
|
||||
echo -e "${YELLOW}Распаковка...${NC}"
|
||||
tar -xzf codex.tar.gz
|
||||
|
||||
# Ищем бинарник (может называться codex или codex-$BINARY_SUFFIX)
|
||||
BINARY_FILE=""
|
||||
if [ -f "codex" ]; then
|
||||
BINARY_FILE="codex"
|
||||
elif [ -f "codex-${BINARY_SUFFIX}" ]; then
|
||||
BINARY_FILE="codex-${BINARY_SUFFIX}"
|
||||
else
|
||||
# Ищем любой файл начинающийся с codex (исключая .tar.gz)
|
||||
BINARY_FILE=$(find . -maxdepth 1 -name 'codex*' -type f ! -name '*.gz' | head -1)
|
||||
fi
|
||||
|
||||
if [ -z "$BINARY_FILE" ] || [ ! -f "$BINARY_FILE" ]; then
|
||||
echo -e "${RED}Бинарник codex не найден в архиве${NC}"
|
||||
ls -la
|
||||
rm -rf "$TEMP_DIR"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo -e "${GREEN}Найден бинарник: $BINARY_FILE${NC}"
|
||||
|
||||
# Проверяем запущен ли codex и завершаем
|
||||
if pgrep -x "codex" > /dev/null; then
|
||||
echo -e "${YELLOW}Обнаружен запущенный процесс codex, завершаем...${NC}"
|
||||
pkill -9 -x "codex" 2>/dev/null || true
|
||||
sleep 1
|
||||
echo -e "${GREEN}✓ Процесс завершён${NC}"
|
||||
fi
|
||||
|
||||
# Устанавливаем
|
||||
echo -e "${YELLOW}Установка в $CODEX_PATH...${NC}"
|
||||
chmod +x "$BINARY_FILE"
|
||||
|
||||
# Проверяем нужен ли sudo
|
||||
if [ -w "$CODEX_DIR" ]; then
|
||||
mv -f "$BINARY_FILE" "$CODEX_PATH"
|
||||
else
|
||||
echo -e "${YELLOW}Требуются права sudo для записи в $CODEX_DIR${NC}"
|
||||
sudo mv -f "$BINARY_FILE" "$CODEX_PATH"
|
||||
fi
|
||||
|
||||
# Очистка
|
||||
cd /
|
||||
rm -rf "$TEMP_DIR"
|
||||
|
||||
# Обновляем PATH кэш
|
||||
hash -r 2>/dev/null || true
|
||||
|
||||
# Проверяем результат
|
||||
echo ""
|
||||
NEW_VERSION=$(get_current_version)
|
||||
|
||||
echo -e "${BLUE}========================================${NC}"
|
||||
echo -e "${GREEN}✓ Обновление завершено!${NC}"
|
||||
echo -e "${BLUE}========================================${NC}"
|
||||
echo -e "Было: ${RED}$CURRENT_VERSION${NC}"
|
||||
echo -e "Стало: ${GREEN}$NEW_VERSION${NC}"
|
||||
echo ""
|
||||
|
||||
# Финальная проверка
|
||||
if [ "$NEW_VERSION" = "$LATEST_VERSION" ]; then
|
||||
echo -e "${GREEN}✓ Версия успешно обновлена до $LATEST_VERSION${NC}"
|
||||
elif [ "$NEW_VERSION" = "не работает" ]; then
|
||||
echo -e "${RED}✗ Бинарник не запускается! Проверьте зависимости.${NC}"
|
||||
ldd "$CODEX_PATH" 2>&1 | grep "not found" || true
|
||||
else
|
||||
echo -e "${YELLOW}⚠ Версия после установки: $NEW_VERSION (ожидалась $LATEST_VERSION)${NC}"
|
||||
fi
|
||||
239
codex/update_codex_patcher.py
Normal file
239
codex/update_codex_patcher.py
Normal file
@@ -0,0 +1,239 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Codex Patcher Update Pipeline — check, update, patch, validate, test.
|
||||
|
||||
Usage:
|
||||
python3 update_codex_patcher.py --check # Check for new version
|
||||
python3 update_codex_patcher.py --update # Download + install new binary
|
||||
python3 update_codex_patcher.py --validate # Validate 6 config targets
|
||||
python3 update_codex_patcher.py --patch # Apply config patches
|
||||
python3 update_codex_patcher.py --test # Integration test
|
||||
python3 update_codex_patcher.py --auto # Full cycle
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
# ANSI colors
|
||||
GREEN = "\033[92m"
|
||||
YELLOW = "\033[93m"
|
||||
RED = "\033[91m"
|
||||
CYAN = "\033[96m"
|
||||
BOLD = "\033[1m"
|
||||
RESET = "\033[0m"
|
||||
|
||||
|
||||
def color(text, c):
|
||||
return f"{c}{text}{RESET}"
|
||||
|
||||
|
||||
def load_config():
|
||||
"""Load codex_config.json."""
|
||||
config_path = SCRIPT_DIR / "codex_config.json"
|
||||
if not config_path.is_file():
|
||||
print(f" {color('Config not found: ' + str(config_path), RED)}")
|
||||
sys.exit(1)
|
||||
with open(config_path) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def cmd_check(config):
|
||||
"""Check GitHub releases for new Codex version."""
|
||||
print(f"\n{BOLD}Checking for updates...{RESET}")
|
||||
try:
|
||||
import urllib.request
|
||||
url = "https://api.github.com/repos/openai/codex/releases/latest"
|
||||
req = urllib.request.Request(url, headers={"User-Agent": "codex-patcher"})
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
data = json.loads(resp.read())
|
||||
|
||||
latest_tag = data.get("tag_name", "")
|
||||
# Tag format: "rust-v0.111.0"
|
||||
latest_version = latest_tag.replace("rust-v", "").replace("v", "")
|
||||
|
||||
# Get installed version
|
||||
from codex_patcher import detect_codex
|
||||
_, installed = detect_codex()
|
||||
|
||||
print(f" Installed: {CYAN}{installed}{RESET}")
|
||||
print(f" Latest: {CYAN}{latest_version}{RESET}")
|
||||
|
||||
if installed == latest_version:
|
||||
print(f" {GREEN}Already up to date!{RESET}")
|
||||
return True
|
||||
else:
|
||||
print(f" {YELLOW}Update available: {installed} → {latest_version}{RESET}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
print(f" {color(f'Error: {e}', RED)}")
|
||||
return False
|
||||
|
||||
|
||||
def cmd_update(config):
|
||||
"""Download and install new Codex binary via update-codex.sh."""
|
||||
print(f"\n{BOLD}Updating Codex binary...{RESET}")
|
||||
update_script = SCRIPT_DIR / "update-codex.sh"
|
||||
if not update_script.is_file():
|
||||
print(f" {color(f'update-codex.sh not found at {update_script}', RED)}")
|
||||
return False
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["bash", str(update_script)],
|
||||
timeout=300
|
||||
)
|
||||
return result.returncode == 0
|
||||
except Exception as e:
|
||||
print(f" {color(f'Error: {e}', RED)}")
|
||||
return False
|
||||
|
||||
|
||||
def cmd_validate(config):
|
||||
"""Validate all 6 config targets."""
|
||||
print(f"\n{BOLD}Validating config targets...{RESET}")
|
||||
try:
|
||||
from updater.config_validator import validate_all, print_validation_report
|
||||
codex_dir = os.path.expanduser("~/.codex")
|
||||
results = validate_all(codex_dir, config)
|
||||
counts = print_validation_report(results)
|
||||
|
||||
# Save report
|
||||
report_dir = SCRIPT_DIR / "reports"
|
||||
report_dir.mkdir(exist_ok=True)
|
||||
report_path = report_dir / "validation_report.json"
|
||||
summary = {
|
||||
"targets": [
|
||||
{"name": t.name, "status": s, "message": m}
|
||||
for t, s, m in results
|
||||
],
|
||||
"counts": counts,
|
||||
}
|
||||
with open(report_path, "w") as f:
|
||||
json.dump(summary, f, indent=2)
|
||||
print(f"\n Report saved: {report_path}")
|
||||
|
||||
return counts.get("RED", 0) == 0
|
||||
except Exception as e:
|
||||
print(f" {color(f'Error: {e}', RED)}")
|
||||
return False
|
||||
|
||||
|
||||
def cmd_patch(config):
|
||||
"""Apply config patches."""
|
||||
print(f"\n{BOLD}Applying patches...{RESET}")
|
||||
try:
|
||||
from codex_patcher import apply_all_patches
|
||||
ok, results = apply_all_patches(config)
|
||||
return ok
|
||||
except Exception as e:
|
||||
print(f" {color(f'Error: {e}', RED)}")
|
||||
return False
|
||||
|
||||
|
||||
def cmd_test(config):
|
||||
"""Run integration test."""
|
||||
print(f"\n{BOLD}Running integration test...{RESET}")
|
||||
|
||||
base_url = config["base_url"].rstrip("/")
|
||||
if not base_url.endswith("/v1"):
|
||||
base_url += "/v1"
|
||||
|
||||
env = os.environ.copy()
|
||||
env["OPENAI_BASE_URL"] = base_url
|
||||
env["OPENAI_API_KEY"] = config["api_key"]
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["codex", "exec",
|
||||
"--sandbox", "danger-full-access",
|
||||
"Reply with just the number 42"],
|
||||
capture_output=True, text=True, timeout=60, env=env
|
||||
)
|
||||
|
||||
output = result.stdout.strip()
|
||||
print(f" Output: {output[:200]}")
|
||||
|
||||
if "42" in output:
|
||||
print(f" {GREEN}Test passed!{RESET}")
|
||||
return True
|
||||
else:
|
||||
print(f" {YELLOW}Unexpected output (no '42' found){RESET}")
|
||||
if result.stderr:
|
||||
print(f" Stderr: {result.stderr[:200]}")
|
||||
return False
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
print(f" {RED}Test timed out (60s){RESET}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f" {color(f'Error: {e}', RED)}")
|
||||
return False
|
||||
|
||||
|
||||
def cmd_auto(config):
|
||||
"""Full cycle: check → update → patch → validate → test."""
|
||||
print(f"\n{BOLD}{'=' * 50}{RESET}")
|
||||
print(f"{BOLD} Codex Patcher — Auto Update Pipeline{RESET}")
|
||||
print(f"{BOLD}{'=' * 50}{RESET}")
|
||||
|
||||
steps = [
|
||||
("Check version", cmd_check),
|
||||
("Update binary", cmd_update),
|
||||
("Apply patches", cmd_patch),
|
||||
("Validate", cmd_validate),
|
||||
("Test", cmd_test),
|
||||
]
|
||||
|
||||
for name, func in steps:
|
||||
ok = func(config)
|
||||
if not ok and name not in ("Check version",):
|
||||
print(f"\n {RED}Pipeline stopped at: {name}{RESET}")
|
||||
return False
|
||||
|
||||
print(f"\n{GREEN}{'=' * 50}{RESET}")
|
||||
print(f"{GREEN} Pipeline completed successfully!{RESET}")
|
||||
print(f"{GREEN}{'=' * 50}{RESET}")
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Codex Patcher Update Pipeline"
|
||||
)
|
||||
parser.add_argument("--check", action="store_true", help="Check for new version")
|
||||
parser.add_argument("--update", action="store_true", help="Update binary")
|
||||
parser.add_argument("--validate", action="store_true", help="Validate config")
|
||||
parser.add_argument("--patch", action="store_true", help="Apply patches")
|
||||
parser.add_argument("--test", action="store_true", help="Run integration test")
|
||||
parser.add_argument("--auto", action="store_true", help="Full auto cycle")
|
||||
args = parser.parse_args()
|
||||
|
||||
config = load_config()
|
||||
|
||||
if args.auto:
|
||||
return 0 if cmd_auto(config) else 1
|
||||
if args.check:
|
||||
return 0 if cmd_check(config) else 1
|
||||
if args.update:
|
||||
return 0 if cmd_update(config) else 1
|
||||
if args.validate:
|
||||
return 0 if cmd_validate(config) else 1
|
||||
if args.patch:
|
||||
return 0 if cmd_patch(config) else 1
|
||||
if args.test:
|
||||
return 0 if cmd_test(config) else 1
|
||||
|
||||
parser.print_help()
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
0
codex/updater/__init__.py
Normal file
0
codex/updater/__init__.py
Normal file
259
codex/updater/config_validator.py
Normal file
259
codex/updater/config_validator.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""Config validator for Codex Patcher — validates 6 config targets.
|
||||
|
||||
Unlike Claude/Gemini patchers (regex-based), Codex validation is state-based:
|
||||
checks config.toml values and environment variables.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tomllib
|
||||
from dataclasses import dataclass
|
||||
from typing import Callable, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConfigTarget:
|
||||
name: str
|
||||
description: str
|
||||
check_key: str # "config_toml" | "env" | "auth"
|
||||
|
||||
|
||||
PATCH_TARGETS = [
|
||||
ConfigTarget(
|
||||
name="api_endpoint",
|
||||
description="Custom proxy via model_providers",
|
||||
check_key="config_toml",
|
||||
),
|
||||
ConfigTarget(
|
||||
name="authentication",
|
||||
description="API key auth configured",
|
||||
check_key="auth",
|
||||
),
|
||||
ConfigTarget(
|
||||
name="analytics_disabled",
|
||||
description="Analytics/telemetry disabled",
|
||||
check_key="config_toml",
|
||||
),
|
||||
ConfigTarget(
|
||||
name="approval_bypass",
|
||||
description="Approval policy set to never",
|
||||
check_key="config_toml",
|
||||
),
|
||||
ConfigTarget(
|
||||
name="sandbox_bypass",
|
||||
description="Sandbox set to danger-full-access",
|
||||
check_key="config_toml",
|
||||
),
|
||||
ConfigTarget(
|
||||
name="env_vars",
|
||||
description="System environment variables configured",
|
||||
check_key="env",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def _read_toml(path):
|
||||
"""Read TOML file, return dict or empty dict."""
|
||||
if not os.path.isfile(path):
|
||||
return None
|
||||
with open(path, "rb") as f:
|
||||
return tomllib.load(f)
|
||||
|
||||
|
||||
def _check_api_endpoint(toml_data, config):
|
||||
"""Check Target 1: model_providers.custom with correct base_url."""
|
||||
if toml_data is None:
|
||||
return "RED", "config.toml not found"
|
||||
|
||||
mp = toml_data.get("model_providers", {})
|
||||
custom = mp.get("custom", {}) if isinstance(mp, dict) else {}
|
||||
|
||||
base_url = config["base_url"].rstrip("/")
|
||||
if not base_url.endswith("/v1"):
|
||||
base_url += "/v1"
|
||||
|
||||
if not custom:
|
||||
return "RED", "model_providers.custom section missing"
|
||||
|
||||
if custom.get("base_url") != base_url:
|
||||
return "YELLOW", f"base_url mismatch: {custom.get('base_url')} != {base_url}"
|
||||
|
||||
if toml_data.get("model_provider") != "custom":
|
||||
return "YELLOW", "model_provider != 'custom'"
|
||||
|
||||
return "GREEN", f"base_url={base_url}"
|
||||
|
||||
|
||||
def _check_auth(codex_dir, config):
|
||||
"""Check Target 2: API key authentication."""
|
||||
# Check env var
|
||||
env_key = os.environ.get("OPENAI_API_KEY", "")
|
||||
if env_key == config["api_key"]:
|
||||
return "GREEN", "OPENAI_API_KEY set correctly"
|
||||
|
||||
# Check /etc/environment
|
||||
etc_env = "/etc/environment"
|
||||
if os.path.isfile(etc_env):
|
||||
with open(etc_env) as f:
|
||||
content = f.read()
|
||||
if config["api_key"] in content:
|
||||
return "GREEN", "API key in /etc/environment"
|
||||
|
||||
if env_key:
|
||||
return "YELLOW", "OPENAI_API_KEY set but different value"
|
||||
|
||||
return "RED", "OPENAI_API_KEY not set"
|
||||
|
||||
|
||||
def _check_analytics(toml_data):
|
||||
"""Check Target 3: analytics disabled."""
|
||||
if toml_data is None:
|
||||
return "RED", "config.toml not found"
|
||||
|
||||
analytics = toml_data.get("analytics", {})
|
||||
if not isinstance(analytics, dict):
|
||||
return "RED", "[analytics] section missing"
|
||||
|
||||
if analytics.get("enabled") is False:
|
||||
return "GREEN", "analytics.enabled = false"
|
||||
|
||||
if "enabled" not in analytics:
|
||||
return "YELLOW", "[analytics] exists but 'enabled' key missing"
|
||||
|
||||
return "YELLOW", f"analytics.enabled = {analytics.get('enabled')}"
|
||||
|
||||
|
||||
def _check_approval(toml_data, config):
|
||||
"""Check Target 4: approval_policy."""
|
||||
if toml_data is None:
|
||||
return "RED", "config.toml not found"
|
||||
|
||||
target_policy = config.get("approval_policy", "never")
|
||||
current = toml_data.get("approval_policy")
|
||||
|
||||
if current == target_policy:
|
||||
return "GREEN", f'approval_policy = "{target_policy}"'
|
||||
|
||||
if current is not None:
|
||||
return "YELLOW", f'approval_policy = "{current}" (expected "{target_policy}")'
|
||||
|
||||
return "RED", "approval_policy not set"
|
||||
|
||||
|
||||
def _check_sandbox(toml_data, config):
|
||||
"""Check Target 5: sandbox_mode."""
|
||||
if toml_data is None:
|
||||
return "RED", "config.toml not found"
|
||||
|
||||
target_mode = config.get("sandbox_mode", "danger-full-access")
|
||||
current = toml_data.get("sandbox_mode")
|
||||
|
||||
if current == target_mode:
|
||||
return "GREEN", f'sandbox_mode = "{target_mode}"'
|
||||
|
||||
if current is not None:
|
||||
return "YELLOW", f'sandbox_mode = "{current}" (expected "{target_mode}")'
|
||||
|
||||
return "RED", "sandbox_mode not set"
|
||||
|
||||
|
||||
def _check_env_vars(config):
|
||||
"""Check Target 6: system environment variables."""
|
||||
base_url = config["base_url"].rstrip("/")
|
||||
if not base_url.endswith("/v1"):
|
||||
base_url += "/v1"
|
||||
|
||||
etc_env = "/etc/environment"
|
||||
if not os.path.isfile(etc_env):
|
||||
return "RED", "/etc/environment not found"
|
||||
|
||||
with open(etc_env) as f:
|
||||
content = f.read()
|
||||
|
||||
has_base = "OPENAI_BASE_URL" in content
|
||||
has_key = "OPENAI_API_KEY" in content
|
||||
|
||||
if has_base and has_key:
|
||||
return "GREEN", "OPENAI_BASE_URL + OPENAI_API_KEY set"
|
||||
|
||||
missing = []
|
||||
if not has_base:
|
||||
missing.append("OPENAI_BASE_URL")
|
||||
if not has_key:
|
||||
missing.append("OPENAI_API_KEY")
|
||||
|
||||
return "YELLOW" if (has_base or has_key) else "RED", f"Missing: {', '.join(missing)}"
|
||||
|
||||
|
||||
def validate_all(codex_dir, config):
|
||||
"""Validate all 6 targets. Returns list of (target, status, message) tuples."""
|
||||
config_path = os.path.join(codex_dir, "config.toml")
|
||||
toml_data = _read_toml(config_path)
|
||||
|
||||
results = []
|
||||
|
||||
# Target 1: API endpoint
|
||||
status, msg = _check_api_endpoint(toml_data, config)
|
||||
results.append((PATCH_TARGETS[0], status, msg))
|
||||
|
||||
# Target 2: Auth
|
||||
status, msg = _check_auth(codex_dir, config)
|
||||
results.append((PATCH_TARGETS[1], status, msg))
|
||||
|
||||
# Target 3: Analytics
|
||||
status, msg = _check_analytics(toml_data)
|
||||
results.append((PATCH_TARGETS[2], status, msg))
|
||||
|
||||
# Target 4: Approval
|
||||
status, msg = _check_approval(toml_data, config)
|
||||
results.append((PATCH_TARGETS[3], status, msg))
|
||||
|
||||
# Target 5: Sandbox
|
||||
status, msg = _check_sandbox(toml_data, config)
|
||||
results.append((PATCH_TARGETS[4], status, msg))
|
||||
|
||||
# Target 6: Env vars
|
||||
status, msg = _check_env_vars(config)
|
||||
results.append((PATCH_TARGETS[5], status, msg))
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# ANSI colors
|
||||
GREEN_C = "\033[92m"
|
||||
YELLOW_C = "\033[93m"
|
||||
RED_C = "\033[91m"
|
||||
BOLD_C = "\033[1m"
|
||||
RESET_C = "\033[0m"
|
||||
|
||||
STATUS_COLORS = {
|
||||
"GREEN": GREEN_C,
|
||||
"YELLOW": YELLOW_C,
|
||||
"RED": RED_C,
|
||||
}
|
||||
|
||||
|
||||
def print_validation_report(results):
|
||||
"""Print formatted validation report."""
|
||||
print(f"\n {BOLD_C}Codex Patcher — Validation Report{RESET_C}")
|
||||
print(" " + "─" * 50)
|
||||
|
||||
counts = {"GREEN": 0, "YELLOW": 0, "RED": 0}
|
||||
for target, status, msg in results:
|
||||
color = STATUS_COLORS.get(status, "")
|
||||
print(f" {color}[{status:6s}]{RESET_C} {target.name}: {target.description}")
|
||||
if status != "GREEN":
|
||||
print(f" → {msg}")
|
||||
counts[status] = counts.get(status, 0) + 1
|
||||
|
||||
print(" " + "─" * 50)
|
||||
total = len(results)
|
||||
print(f" {GREEN_C}{counts['GREEN']}{RESET_C}/{total} GREEN "
|
||||
f"{YELLOW_C}{counts['YELLOW']}{RESET_C} YELLOW "
|
||||
f"{RED_C}{counts['RED']}{RESET_C} RED")
|
||||
|
||||
if counts["GREEN"] == total:
|
||||
print(f"\n {GREEN_C}All targets configured correctly!{RESET_C}")
|
||||
elif counts["RED"] > 0:
|
||||
print(f"\n {RED_C}Critical targets missing. Run: python3 codex_patcher.py --apply{RESET_C}")
|
||||
|
||||
return counts
|
||||
Reference in New Issue
Block a user