#!/usr/bin/env python3 """ Codex CLI Patcher — config+env patching for Codex CLI (Rust binary). Unlike Claude Code and Gemini CLI (JavaScript), Codex is a compiled Rust binary. Patching is done via config.toml manipulation and environment variable injection. Targets: 1. api_endpoint — redirect API to custom proxy via model_providers 2. authentication — API key auth via codex login 3. telemetry — disable analytics 4. permissions — bypass approvals + sandbox 5. model_config — model, reasoning, disable auto-update 6. system_env — /etc/environment vars """ import json import os import sys import shutil import platform import subprocess import argparse import tomllib from pathlib import Path from datetime import datetime # ─── Constants ────────────────────────────────────────────────────────── SCRIPT_DIR = Path(__file__).resolve().parent CONFIG_PATH = SCRIPT_DIR / "codex_config.json" IS_WINDOWS = platform.system() == "Windows" IS_MACOS = platform.system() == "Darwin" # ANSI colors GREEN = "\033[92m" YELLOW = "\033[93m" RED = "\033[91m" CYAN = "\033[96m" BOLD = "\033[1m" RESET = "\033[0m" # Managed config keys (we update these, preserve everything else) MANAGED_TOP_KEYS = { "model", "model_reasoning_effort", "model_provider", "approval_policy", "sandbox_mode", "check_for_update_on_startup", "forced_login_method", } MANAGED_SECTIONS = {"analytics", "model_providers"} # ─── Config Loading ───────────────────────────────────────────────────── def load_config(config_path=None): """Load codex_config.json.""" path = Path(config_path) if config_path else CONFIG_PATH if not path.is_file(): print(f"{RED}Config not found: {path}{RESET}") sys.exit(1) with open(path) as f: return json.load(f) # ─── Detection ────────────────────────────────────────────────────────── def detect_codex(): """Find codex binary. Returns (binary_path, version) or exits.""" # Try which/where cmd = "where" if IS_WINDOWS else "which" try: result = subprocess.run( [cmd, "codex"], capture_output=True, text=True, timeout=5 ) binary_path = result.stdout.strip().split("\n")[0] if result.returncode == 0 else None except Exception: binary_path = None if not binary_path: # Common fallback paths for p in ["/usr/local/bin/codex", "/usr/bin/codex"]: if os.path.isfile(p): binary_path = p break if not binary_path: print(f"{RED}Codex CLI not found. Install: https://github.com/openai/codex{RESET}") sys.exit(1) # Get version version = "unknown" try: result = subprocess.run( [binary_path, "--version"], capture_output=True, text=True, timeout=10 ) # Output: "codex-cli 0.111.0" if result.returncode == 0: parts = result.stdout.strip().split() if len(parts) >= 2: version = parts[-1] except Exception: pass return binary_path, version # ─── TOML Read/Write ──────────────────────────────────────────────────── def read_toml(path): """Read TOML file. Returns dict or empty dict if not found/broken.""" if not os.path.isfile(path): return {} try: with open(path, "rb") as f: return tomllib.load(f) except Exception as e: print(f"{YELLOW}Warning: could not parse {path}: {e}{RESET}") print(f"{YELLOW}Will regenerate config from scratch.{RESET}") return {} def read_toml_raw(path): """Read TOML file as raw text. Returns string or empty string.""" if not os.path.isfile(path): return "" with open(path, "r", encoding="utf-8") as f: return f.read() def toml_value(v): """Format a Python value as TOML.""" if isinstance(v, bool): return "true" if v else "false" if isinstance(v, str): return f'"{v}"' if isinstance(v, (int, float)): return str(v) if isinstance(v, list): items = ", ".join(toml_value(i) for i in v) return f"[{items}]" return str(v) def toml_key(k): """Format a TOML key, quoting if it contains dots or special chars.""" if "." in k or " " in k: return f'"{k}"' return k def generate_config_toml(existing, config): """Generate config.toml content, merging with existing user config. Strategy: - Update managed top-level keys - Update managed sections ([analytics], [model_providers.custom]) - Add trust paths to [projects.*] - Preserve all other user-defined content """ lines = ["# Codex CLI Configuration (managed by codex_patcher.py)"] # Top-level managed keys lines.append(f'model = "{config["model"]}"') lines.append(f'model_reasoning_effort = "{config.get("model_reasoning_effort", "high")}"') lines.append('model_provider = "custom"') lines.append(f'approval_policy = "{config.get("approval_policy", "never")}"') lines.append(f'sandbox_mode = "{config.get("sandbox_mode", "danger-full-access")}"') lines.append(f'check_for_update_on_startup = {toml_value(config.get("check_for_update", False))}') lines.append('forced_login_method = "api"') # Preserve existing top-level keys we don't manage for key, val in existing.items(): if key not in MANAGED_TOP_KEYS and not isinstance(val, dict): lines.append(f"{key} = {toml_value(val)}") # [analytics] lines.append("") lines.append("[analytics]") lines.append(f"enabled = {toml_value(config.get('telemetry_enabled', False))}") # [model_providers.custom] base_url = config["base_url"].rstrip("/") if not base_url.endswith("/v1"): base_url += "/v1" lines.append("") lines.append("[model_providers.custom]") lines.append('name = "custom"') lines.append(f'base_url = "{base_url}"') lines.append(f'env_key = "OPENAI_API_KEY"') lines.append(f'wire_api = "{config.get("wire_api", "responses")}"') # Preserve other model_providers mp = existing.get("model_providers", {}) if isinstance(mp, dict): for name, provider in mp.items(): if name == "custom": continue lines.append("") lines.append(f"[model_providers.{name}]") for k, v in provider.items(): lines.append(f"{k} = {toml_value(v)}") # Trust paths trust_paths = config.get("trust_paths", ["/home", "/root", "/tmp"]) existing_projects = existing.get("projects", {}) # Add our trust paths for tp in trust_paths: lines.append("") lines.append(f'[projects."{tp}"]') lines.append('trust_level = "trusted"') # Preserve user's existing project trust entries (that aren't in our list) for path, proj_conf in existing_projects.items(): if path not in trust_paths and isinstance(proj_conf, dict): lines.append("") lines.append(f'[projects."{path}"]') for k, v in proj_conf.items(): lines.append(f"{k} = {toml_value(v)}") # Preserve other sections we don't manage skip_sections = {"analytics", "model_providers", "projects", "notice"} for key, val in existing.items(): if key in skip_sections or key in MANAGED_TOP_KEYS: continue if isinstance(val, dict): lines.append("") lines.append(f"[{key}]") for k, v in val.items(): if isinstance(v, dict): # Nested table lines.append("") lines.append(f"[{key}.{k}]") for kk, vv in v.items(): lines.append(f"{toml_key(kk)} = {toml_value(vv)}") else: lines.append(f"{toml_key(k)} = {toml_value(v)}") # [notice.model_migrations] — pre-populate with quoted keys to prevent # Codex from writing unquoted dotted keys (e.g. gpt-5.4 → gpt-5 → 4) # which breaks TOML parsing lines.append("") lines.append("[notice]") lines.append("[notice.model_migrations]") models = config.get("models", [config["model"]]) for m in models: lines.append(f'{toml_key(m)} = "done"') return "\n".join(lines) + "\n" # ─── Backup ───────────────────────────────────────────────────────────── def backup_file(path): """Create timestamped backup. Returns backup path or None.""" if not os.path.isfile(path): return None ts = datetime.now().strftime("%Y%m%d%H%M%S") backup = f"{path}.backup.{ts}" shutil.copy2(path, backup) return backup # ─── Target 1: API Endpoint ───────────────────────────────────────────── def patch_api_endpoint(codex_dir, config): """Target 1: Configure [model_providers.custom] in config.toml.""" config_path = os.path.join(codex_dir, "config.toml") existing = read_toml(config_path) base_url = config["base_url"].rstrip("/") if not base_url.endswith("/v1"): base_url += "/v1" # Check if already configured mp = existing.get("model_providers", {}) custom = mp.get("custom", {}) if isinstance(mp, dict) else {} if (custom.get("base_url") == base_url and existing.get("model_provider") == "custom"): return True, "Already configured" return True, f"Will set base_url={base_url}" # ─── Target 2: Authentication ─────────────────────────────────────────── def patch_auth(config, home_dir=None): """Target 2: Configure API key auth via codex login --with-api-key.""" api_key = config["api_key"] messages = [] # Set env var for current process os.environ["OPENAI_API_KEY"] = api_key # Run codex login --with-api-key try: env = os.environ.copy() env["OPENAI_API_KEY"] = api_key result = subprocess.run( ["codex", "login", "--with-api-key"], input=api_key + "\n", capture_output=True, text=True, timeout=30, env=env ) if result.returncode == 0: messages.append("codex login: ok") else: # May already be logged in or other issue stderr = result.stderr.strip() if "already" in stderr.lower(): messages.append("codex login: already authenticated") else: messages.append(f"codex login: exit {result.returncode}") except subprocess.TimeoutExpired: messages.append("codex login: timeout (30s)") except FileNotFoundError: messages.append("codex login: binary not found") except Exception as e: messages.append(f"codex login: {e}") return True, "; ".join(messages) # ─── Target 3: Telemetry ──────────────────────────────────────────────── def patch_telemetry(codex_dir, config): """Target 3: Disable analytics in config.toml.""" config_path = os.path.join(codex_dir, "config.toml") existing = read_toml(config_path) analytics = existing.get("analytics", {}) if isinstance(analytics, dict) and analytics.get("enabled") is False: return True, "Already disabled" return True, "Will disable analytics" # ─── Target 4: Permissions ────────────────────────────────────────────── def patch_permissions(codex_dir, config): """Target 4: Set approval_policy=never, sandbox=danger-full-access.""" config_path = os.path.join(codex_dir, "config.toml") existing = read_toml(config_path) policy = config.get("approval_policy", "never") sandbox = config.get("sandbox_mode", "danger-full-access") if (existing.get("approval_policy") == policy and existing.get("sandbox_mode") == sandbox): return True, "Already configured" return True, f"Will set approval={policy}, sandbox={sandbox}" # ─── Target 5: Model Config ───────────────────────────────────────────── def patch_model_config(codex_dir, config): """Target 5: Set model, reasoning_effort, disable auto-update.""" config_path = os.path.join(codex_dir, "config.toml") existing = read_toml(config_path) model = config["model"] effort = config.get("model_reasoning_effort", "high") if (existing.get("model") == model and existing.get("model_reasoning_effort") == effort and existing.get("check_for_update_on_startup") is False): return True, "Already configured" return True, f"Will set model={model}, effort={effort}" # ─── Target 6: System Environment ─────────────────────────────────────── def setup_env_vars(config): """Target 6: Set OPENAI_BASE_URL and OPENAI_API_KEY in /etc/environment.""" base_url = config["base_url"].rstrip("/") if not base_url.endswith("/v1"): base_url += "/v1" env_vars = { "OPENAI_BASE_URL": base_url, "OPENAI_API_KEY": config["api_key"], } if IS_WINDOWS: # Use setx for Windows count = 0 for key, val in env_vars.items(): try: subprocess.run( ["setx", key, val, "/M"], capture_output=True, timeout=10 ) count += 1 except Exception: pass return count > 0, f"Set {count} env var(s) via setx" # Linux/macOS: /etc/environment etc_env = "/etc/environment" try: content = "" if os.path.isfile(etc_env): with open(etc_env, "r") as f: content = f.read() changed = False for key, val in env_vars.items(): line = f'{key}="{val}"' if key in content: # Update existing new_lines = [] for l in content.split("\n"): if l.startswith(f"{key}="): if l != line: new_lines.append(line) changed = True else: new_lines.append(l) else: new_lines.append(l) content = "\n".join(new_lines) else: content = content.rstrip("\n") + "\n" + line + "\n" changed = True if changed: with open(etc_env, "w") as f: f.write(content) return True, f"Set {len(env_vars)} env var(s) in {etc_env}" else: return True, "Env vars already set" except PermissionError: return False, f"Permission denied: {etc_env} (run as root)" except Exception as e: return False, f"Error: {e}" # ─── Apply All Patches ────────────────────────────────────────────────── def apply_all_patches(config, home_dir=None): """Apply all 6 patch targets. Returns (all_ok, results_dict).""" if home_dir is None: home_dir = os.path.expanduser("~") codex_dir = os.path.join(home_dir, ".codex") os.makedirs(codex_dir, exist_ok=True) config_path = os.path.join(codex_dir, "config.toml") results = {} all_ok = True binary_path, version = detect_codex() print(f"\n{BOLD}Codex CLI Patcher{RESET}") print(f" Version: {CYAN}{version}{RESET}") print(f" Binary: {binary_path}") print(f" Proxy: {config['base_url']}") print() # Read existing config existing = read_toml(config_path) # Backup before any changes backup_file(config_path) # Generate new config.toml (merge) new_content = generate_config_toml(existing, config) # Write config.toml with open(config_path, "w", encoding="utf-8") as f: f.write(new_content) # Target 1: API endpoint ok, msg = patch_api_endpoint(codex_dir, config) results["api_endpoint"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 1: {msg}") if not ok: all_ok = False # Target 2: Authentication ok, msg = patch_auth(config, home_dir) results["authentication"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 2: {msg}") if not ok: all_ok = False # Target 3: Telemetry ok, msg = patch_telemetry(codex_dir, config) results["telemetry"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 3: {msg}") if not ok: all_ok = False # Target 4: Permissions ok, msg = patch_permissions(codex_dir, config) results["permissions"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 4: {msg}") if not ok: all_ok = False # Target 5: Model config ok, msg = patch_model_config(codex_dir, config) results["model_config"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 5: {msg}") if not ok: all_ok = False # Target 6: System env ok, msg = setup_env_vars(config) results["system_env"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 6: {msg}") if not ok: all_ok = False print() if all_ok: print(f" {GREEN}All patches applied successfully!{RESET}") else: print(f" {RED}Some patches failed. Check output above.{RESET}") return all_ok, results # ─── Rollback ──────────────────────────────────────────────────────────── def rollback(home_dir=None): """Restore config.toml from latest backup.""" if home_dir is None: home_dir = os.path.expanduser("~") codex_dir = os.path.join(home_dir, ".codex") config_path = os.path.join(codex_dir, "config.toml") # Find latest backup backups = sorted(Path(codex_dir).glob("config.toml.backup.*"), reverse=True) if not backups: print(f"{RED}No backups found in {codex_dir}{RESET}") return False latest = backups[0] shutil.copy2(str(latest), config_path) print(f"{GREEN}Restored from {latest.name}{RESET}") return True # ─── Multi-User Support ───────────────────────────────────────────────── def list_users(): """List system users with .codex/ or home dirs.""" users = [] try: import pwd for pw in pwd.getpwall(): home = pw.pw_dir if not os.path.isdir(home): continue if pw.pw_uid < 1000 and pw.pw_uid != 0: continue if pw.pw_shell in ("/usr/sbin/nologin", "/bin/false"): continue users.append(pw) except ImportError: pass return users def patch_user(user_home, config): """Patch a single user's ~/.codex/ config.""" codex_dir = os.path.join(user_home, ".codex") os.makedirs(codex_dir, exist_ok=True) config_path = os.path.join(codex_dir, "config.toml") existing = read_toml(config_path) backup_file(config_path) new_content = generate_config_toml(existing, config) with open(config_path, "w", encoding="utf-8") as f: f.write(new_content) return True # ─── CLI ───────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="Codex CLI Patcher — route Codex through custom AI proxy" ) parser.add_argument("--apply", action="store_true", help="Apply all patches") parser.add_argument("--all", action="store_true", help="Patch all user accounts") parser.add_argument("--rollback", action="store_true", help="Restore from backup") parser.add_argument("--detect", action="store_true", help="Detect Codex installation") parser.add_argument("--validate", action="store_true", help="Validate config state") parser.add_argument("--config", type=str, help="Path to codex_config.json") parser.add_argument("--yes", action="store_true", help="Non-interactive mode") args = parser.parse_args() config = load_config(args.config) if args.detect: binary_path, version = detect_codex() print(f"Binary: {binary_path}") print(f"Version: {version}") return 0 if args.rollback: return 0 if rollback() else 1 if args.validate: # Import validator sys.path.insert(0, str(SCRIPT_DIR)) from updater.config_validator import validate_all, print_validation_report codex_dir = os.path.expanduser("~/.codex") results = validate_all(codex_dir, config) print_validation_report(results) return 0 if all(r[1] == "GREEN" for r in results) else 1 if args.apply: # Apply for current user ok, results = apply_all_patches(config) # Patch other users if --all if args.all: for user in list_users(): if user.pw_dir == os.path.expanduser("~"): continue try: patch_user(user.pw_dir, config) print(f" Patched {user.pw_name}: {user.pw_dir}/.codex/config.toml") except Exception as e: print(f" {RED}Failed {user.pw_name}: {e}{RESET}") return 0 if ok else 1 parser.print_help() return 0 if __name__ == "__main__": raise SystemExit(main())