#!/usr/bin/env python3 """ Codex CLI Patcher — config+env patching for Codex CLI (Rust binary). Unlike Claude Code and Gemini CLI (JavaScript), Codex is a compiled Rust binary. Patching is done via config.toml manipulation and environment variable injection. Targets: 1. api_endpoint — redirect API to custom proxy via model_providers 2. authentication — API key auth via codex login 3. telemetry — disable analytics 4. permissions — bypass approvals + sandbox 5. model_config — model, reasoning, disable auto-update 6. system_env — /etc/environment vars """ import json import os import sys import shutil import platform import subprocess import argparse try: import tomllib except ModuleNotFoundError: try: import tomli as tomllib except ModuleNotFoundError: # Minimal TOML reader for Python < 3.11 import re as _re class _T: @staticmethod def load(f): raw = f.read() return _T._parse(raw.decode("utf-8") if isinstance(raw, bytes) else raw) @staticmethod def loads(s): return _T._parse(s) @staticmethod def _parse(text): result, cur = {}, None for line in text.split("\n"): line = line.strip() if not line or line.startswith("#"): continue m = _re.match(r'^\[([^\]]+)\]$', line) if m: keys = [k.strip().strip('"') for k in m.group(1).split(".")] cur = result for k in keys: cur = cur.setdefault(k, {}) continue m = _re.match(r'^([^=]+?)\s*=\s*(.+)$', line) if m and cur is not None: k, v = m.group(1).strip(), m.group(2).strip() if v.startswith('"') and v.endswith('"'): v = v[1:-1] elif v == "true": v = True elif v == "false": v = False elif _re.match(r'^-?\d+$', v): v = int(v) elif v.startswith("[") and v.endswith("]"): inner = v[1:-1].strip() v = [x.strip().strip('"') for x in inner.split(",")] if inner else [] cur[k] = v elif m: k, v = m.group(1).strip(), m.group(2).strip() if v.startswith('"') and v.endswith('"'): v = v[1:-1] elif v == "true": v = True elif v == "false": v = False elif _re.match(r'^-?\d+$', v): v = int(v) result[k] = v return result tomllib = _T() from pathlib import Path from datetime import datetime # ─── Constants ────────────────────────────────────────────────────────── SCRIPT_DIR = Path(__file__).resolve().parent CONFIG_PATH = SCRIPT_DIR / "codex_config.json" IS_WINDOWS = platform.system() == "Windows" IS_MACOS = platform.system() == "Darwin" # ANSI colors GREEN = "\033[92m" YELLOW = "\033[93m" RED = "\033[91m" CYAN = "\033[96m" BOLD = "\033[1m" RESET = "\033[0m" # Managed config keys (we update these, preserve everything else) MANAGED_TOP_KEYS = { "model", "model_reasoning_effort", "model_provider", "model_catalog_json", "approval_policy", "sandbox_mode", "check_for_update_on_startup", "forced_login_method", } MANAGED_SECTIONS = {"analytics", "model_providers"} # Model catalog template (Codex internal format from codex-rs/core/models.json) MODEL_TEMPLATE = { "prefer_websockets": False, "support_verbosity": True, "default_verbosity": "low", "apply_patch_tool_type": "freeform", "input_modalities": ["text", "image"], "supports_image_detail_original": True, "truncation_policy": {"mode": "tokens", "limit": 10000}, "supports_parallel_tool_calls": True, "context_window": 272000, "default_reasoning_summary": "none", "shell_type": "shell_command", "visibility": "list", "supported_in_api": True, "availability_nux": None, "upgrade": None, "priority": 0, "base_instructions": "", "model_messages": None, "experimental_supported_tools": [], "supports_reasoning_summaries": True, "supported_reasoning_levels": [ {"effort": "low", "description": "Fast responses with lighter reasoning"}, {"effort": "medium", "description": "Balances speed and reasoning depth"}, {"effort": "high", "description": "Greater reasoning depth for complex problems"}, {"effort": "xhigh", "description": "Extra high reasoning depth"}, ], "default_reasoning_level": "medium", } def generate_model_catalog(models): """Generate model catalog JSON in Codex internal format.""" entries = [] for i, slug in enumerate(models): entry = dict(MODEL_TEMPLATE) entry["slug"] = slug entry["display_name"] = slug entry["description"] = f"Model {slug}" entry["priority"] = i entries.append(entry) return {"models": entries} # ─── Config Loading ───────────────────────────────────────────────────── def load_config(config_path=None): """Load codex_config.json.""" path = Path(config_path) if config_path else CONFIG_PATH if not path.is_file(): print(f"{RED}Config not found: {path}{RESET}") sys.exit(1) with open(path) as f: return json.load(f) # ─── Detection ────────────────────────────────────────────────────────── def detect_codex(): """Find codex binary. Returns (binary_path, version) or exits.""" # Try which/where cmd = "where" if IS_WINDOWS else "which" try: result = subprocess.run( [cmd, "codex"], capture_output=True, text=True, timeout=5 ) binary_path = result.stdout.strip().split("\n")[0] if result.returncode == 0 else None except Exception: binary_path = None if not binary_path: # Common fallback paths for p in ["/usr/local/bin/codex", "/usr/bin/codex"]: if os.path.isfile(p): binary_path = p break if not binary_path: print(f"{RED}Codex CLI not found. Install: https://github.com/openai/codex{RESET}") sys.exit(1) # Get version version = "unknown" try: result = subprocess.run( [binary_path, "--version"], capture_output=True, text=True, timeout=10 ) # Output: "codex-cli 0.111.0" if result.returncode == 0: parts = result.stdout.strip().split() if len(parts) >= 2: version = parts[-1] except Exception: pass return binary_path, version # ─── TOML Read/Write ──────────────────────────────────────────────────── def read_toml(path): """Read TOML file. Returns dict or empty dict if not found/broken.""" if not os.path.isfile(path): return {} try: with open(path, "rb") as f: return tomllib.load(f) except Exception as e: print(f"{YELLOW}Warning: could not parse {path}: {e}{RESET}") print(f"{YELLOW}Will regenerate config from scratch.{RESET}") return {} def read_toml_raw(path): """Read TOML file as raw text. Returns string or empty string.""" if not os.path.isfile(path): return "" with open(path, "r", encoding="utf-8") as f: return f.read() def _toml_str(s): """Escape a Python string as a TOML basic-string literal. Backslashes (Windows paths!) and double-quotes must be escaped per TOML spec — otherwise the parser sees `C:\\Windows` as `C:Windows` or fails with `Unescaped '\\' in a string`. """ escaped = s.replace("\\", "\\\\").replace('"', '\\"') return f'"{escaped}"' def toml_value(v): """Format a Python value as TOML. Dicts → inline table `{ key = "val", k2 = "v2" }` (NOT Python str(dict) which uses single quotes + colons and breaks `tomllib.load`). Strings → basic-string with backslash/quote escapes (Windows paths). """ if isinstance(v, bool): return "true" if v else "false" if isinstance(v, str): return _toml_str(v) if isinstance(v, (int, float)): return str(v) if isinstance(v, list): items = ", ".join(toml_value(i) for i in v) return f"[{items}]" if isinstance(v, dict): # TOML inline table: keys may need quoting (special chars / dots), # values recurse through toml_value. items = ", ".join( f"{toml_key(k)} = {toml_value(val)}" for k, val in v.items() ) return f"{{ {items} }}" if items else "{}" return _toml_str(str(v)) def toml_key(k): """Format a TOML key, quoting if it contains anything other than the bare-key charset (A-Z a-z 0-9 _ -). Examples that MUST be quoted: "PROGRAMFILES(X86)" — parens "key.with.dots" — would parse as nested "key with spaces" """ import re as _re if _re.fullmatch(r"[A-Za-z0-9_-]+", k): return k # Quote, escape backslashes and quotes escaped = k.replace("\\", "\\\\").replace('"', '\\"') return f'"{escaped}"' def generate_config_toml(existing, config, home_dir=None): """Generate config.toml content, merging with existing user config. Strategy: - Update managed top-level keys - Update managed sections ([analytics], [model_providers.custom]) - Add trust paths to [projects.*] - Preserve all other user-defined content """ lines = ["# Codex CLI Configuration (managed by codex_patcher.py)"] # Top-level managed keys lines.append(f'model = "{config["model"]}"') lines.append(f'model_reasoning_effort = "{config.get("model_reasoning_effort", "xhigh")}"') lines.append('model_provider = "custom"') # Model catalog path (for model picker) codex_dir_path = os.path.join(home_dir or os.path.expanduser("~"), ".codex") catalog_path = os.path.join(codex_dir_path, "model_catalog.json") catalog_path_toml = catalog_path.replace("\\", "/") lines.append(f'model_catalog_json = "{catalog_path_toml}"') lines.append(f'approval_policy = "{config.get("approval_policy", "never")}"') lines.append(f'sandbox_mode = "{config.get("sandbox_mode", "danger-full-access")}"') lines.append(f'check_for_update_on_startup = {toml_value(config.get("check_for_update", False))}') lines.append('forced_login_method = "api"') # Preserve existing top-level keys we don't manage for key, val in existing.items(): if key not in MANAGED_TOP_KEYS and not isinstance(val, dict): lines.append(f"{key} = {toml_value(val)}") # [analytics] lines.append("") lines.append("[analytics]") lines.append(f"enabled = {toml_value(config.get('telemetry_enabled', False))}") # [model_providers.custom] base_url = config["base_url"].rstrip("/") if not base_url.endswith("/v1"): base_url += "/v1" lines.append("") lines.append("[model_providers.custom]") lines.append('name = "custom"') lines.append(f'base_url = "{base_url}"') lines.append(f'env_key = "OPENAI_API_KEY"') lines.append(f'wire_api = "{config.get("wire_api", "responses")}"') # Preserve other model_providers mp = existing.get("model_providers", {}) if isinstance(mp, dict): for name, provider in mp.items(): if name == "custom": continue lines.append("") lines.append(f"[model_providers.{name}]") for k, v in provider.items(): lines.append(f"{k} = {toml_value(v)}") # Trust paths (add platform-specific paths) trust_paths = list(config.get("trust_paths", ["/home", "/root", "/tmp"])) if IS_MACOS: for mp in ["/Users", "/var/root"]: if mp not in trust_paths: trust_paths.append(mp) existing_projects = existing.get("projects", {}) # Add our trust paths for tp in trust_paths: lines.append("") lines.append(f'[projects."{tp}"]') lines.append('trust_level = "trusted"') # Preserve user's existing project trust entries (that aren't in our list) for path, proj_conf in existing_projects.items(): if path not in trust_paths and isinstance(proj_conf, dict): lines.append("") lines.append(f'[projects."{path}"]') for k, v in proj_conf.items(): lines.append(f"{k} = {toml_value(v)}") # Preserve other sections we don't manage skip_sections = {"analytics", "model_providers", "projects", "notice"} for key, val in existing.items(): if key in skip_sections or key in MANAGED_TOP_KEYS: continue if isinstance(val, dict): lines.append("") lines.append(f"[{key}]") for k, v in val.items(): if isinstance(v, dict): # Nested table lines.append("") lines.append(f"[{key}.{k}]") for kk, vv in v.items(): lines.append(f"{toml_key(kk)} = {toml_value(vv)}") else: lines.append(f"{toml_key(k)} = {toml_value(v)}") # [notice.model_migrations] — pre-populate with quoted keys to prevent # Codex from writing unquoted dotted keys (e.g. gpt-5.4 → gpt-5 → 4) # which breaks TOML parsing lines.append("") lines.append("[notice]") lines.append("[notice.model_migrations]") models = config.get("models", [config["model"]]) for m in models: lines.append(f'{toml_key(m)} = "done"') return "\n".join(lines) + "\n" # ─── Backup ───────────────────────────────────────────────────────────── def backup_file(path): """Create timestamped backup. Returns backup path or None.""" if not os.path.isfile(path): return None ts = datetime.now().strftime("%Y%m%d%H%M%S") backup = f"{path}.backup.{ts}" shutil.copy2(path, backup) return backup # ─── Target 1: API Endpoint ───────────────────────────────────────────── def patch_api_endpoint(codex_dir, config): """Target 1: Configure [model_providers.custom] in config.toml.""" config_path = os.path.join(codex_dir, "config.toml") existing = read_toml(config_path) base_url = config["base_url"].rstrip("/") if not base_url.endswith("/v1"): base_url += "/v1" # Check if already configured mp = existing.get("model_providers", {}) custom = mp.get("custom", {}) if isinstance(mp, dict) else {} if (custom.get("base_url") == base_url and existing.get("model_provider") == "custom"): return True, "Already configured" return True, f"Will set base_url={base_url}" # ─── Target 2: Authentication ─────────────────────────────────────────── def patch_auth(config, home_dir=None): """Target 2: Configure API key auth via codex login --with-api-key.""" api_key = config["api_key"] messages = [] # Set env var for current process os.environ["OPENAI_API_KEY"] = api_key # Run codex login --with-api-key try: env = os.environ.copy() env["OPENAI_API_KEY"] = api_key result = subprocess.run( ["codex", "login", "--with-api-key"], input=api_key + "\n", capture_output=True, text=True, timeout=30, env=env ) if result.returncode == 0: messages.append("codex login: ok") else: # May already be logged in or other issue stderr = result.stderr.strip() if "already" in stderr.lower(): messages.append("codex login: already authenticated") else: messages.append(f"codex login: exit {result.returncode}") except subprocess.TimeoutExpired: messages.append("codex login: timeout (30s)") except FileNotFoundError: messages.append("codex login: binary not found") except Exception as e: messages.append(f"codex login: {e}") return True, "; ".join(messages) # ─── Target 3: Telemetry ──────────────────────────────────────────────── def patch_telemetry(codex_dir, config): """Target 3: Disable analytics in config.toml.""" config_path = os.path.join(codex_dir, "config.toml") existing = read_toml(config_path) analytics = existing.get("analytics", {}) if isinstance(analytics, dict) and analytics.get("enabled") is False: return True, "Already disabled" return True, "Will disable analytics" # ─── Target 4: Permissions ────────────────────────────────────────────── def patch_permissions(codex_dir, config): """Target 4: Set approval_policy=never, sandbox=danger-full-access.""" config_path = os.path.join(codex_dir, "config.toml") existing = read_toml(config_path) policy = config.get("approval_policy", "never") sandbox = config.get("sandbox_mode", "danger-full-access") if (existing.get("approval_policy") == policy and existing.get("sandbox_mode") == sandbox): return True, "Already configured" return True, f"Will set approval={policy}, sandbox={sandbox}" # ─── Target 5: Model Config ───────────────────────────────────────────── def patch_model_config(codex_dir, config): """Target 5: Set model, reasoning_effort, disable auto-update.""" config_path = os.path.join(codex_dir, "config.toml") existing = read_toml(config_path) model = config["model"] effort = config.get("model_reasoning_effort", "xhigh") if (existing.get("model") == model and existing.get("model_reasoning_effort") == effort and existing.get("check_for_update_on_startup") is False): return True, "Already configured" return True, f"Will set model={model}, effort={effort}" # ─── Target 6: System Environment ─────────────────────────────────────── def setup_env_vars(config): """Target 6: Set OPENAI_BASE_URL and OPENAI_API_KEY in /etc/environment.""" base_url = config["base_url"].rstrip("/") if not base_url.endswith("/v1"): base_url += "/v1" env_vars = { "OPENAI_BASE_URL": base_url, "OPENAI_API_KEY": config["api_key"], } if IS_WINDOWS: # Use setx for Windows count = 0 for key, val in env_vars.items(): try: subprocess.run( ["setx", key, val, "/M"], capture_output=True, timeout=10 ) count += 1 except Exception: pass return count > 0, f"Set {count} env var(s) via setx" # Linux/macOS: /etc/environment etc_env = "/etc/environment" try: content = "" if os.path.isfile(etc_env): with open(etc_env, "r") as f: content = f.read() changed = False for key, val in env_vars.items(): line = f'{key}="{val}"' if key in content: # Update existing new_lines = [] for l in content.split("\n"): if l.startswith(f"{key}="): if l != line: new_lines.append(line) changed = True else: new_lines.append(l) else: new_lines.append(l) content = "\n".join(new_lines) else: content = content.rstrip("\n") + "\n" + line + "\n" changed = True if changed: with open(etc_env, "w") as f: f.write(content) return True, f"Set {len(env_vars)} env var(s) in {etc_env}" else: return True, "Env vars already set" except PermissionError: return False, f"Permission denied: {etc_env} (run as root)" except Exception as e: return False, f"Error: {e}" # ─── Apply All Patches ────────────────────────────────────────────────── def apply_all_patches(config, home_dir=None): """Apply all 6 patch targets. Returns (all_ok, results_dict).""" if home_dir is None: home_dir = os.path.expanduser("~") codex_dir = os.path.join(home_dir, ".codex") os.makedirs(codex_dir, exist_ok=True) config_path = os.path.join(codex_dir, "config.toml") results = {} all_ok = True binary_path, version = detect_codex() print(f"\n{BOLD}Codex CLI Patcher{RESET}") print(f" Version: {CYAN}{version}{RESET}") print(f" Binary: {binary_path}") print(f" Proxy: {config['base_url']}") print() # Generate model catalog JSON for model picker (Codex internal format) catalog_path = os.path.join(codex_dir, "model_catalog.json") models = config.get("models", [config["model"]]) catalog = generate_model_catalog(models) with open(catalog_path, "w", encoding="utf-8") as f: json.dump(catalog, f, indent=2) print(f" {'[OK]':>8} Catalog: {catalog_path} ({len(models)} models)") # Read existing config existing = read_toml(config_path) # Backup before any changes backup_file(config_path) # Generate new config.toml (merge) new_content = generate_config_toml(existing, config, home_dir=home_dir) # Write config.toml with open(config_path, "w", encoding="utf-8") as f: f.write(new_content) # Target 1: API endpoint ok, msg = patch_api_endpoint(codex_dir, config) results["api_endpoint"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 1: {msg}") if not ok: all_ok = False # Target 2: Authentication ok, msg = patch_auth(config, home_dir) results["authentication"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 2: {msg}") if not ok: all_ok = False # Target 3: Telemetry ok, msg = patch_telemetry(codex_dir, config) results["telemetry"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 3: {msg}") if not ok: all_ok = False # Target 4: Permissions ok, msg = patch_permissions(codex_dir, config) results["permissions"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 4: {msg}") if not ok: all_ok = False # Target 5: Model config ok, msg = patch_model_config(codex_dir, config) results["model_config"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 5: {msg}") if not ok: all_ok = False # Target 6: System env ok, msg = setup_env_vars(config) results["system_env"] = (ok, msg) print(f" {'[OK]' if ok else '[FAIL]':>8} Target 6: {msg}") if not ok: all_ok = False print() if all_ok: print(f" {GREEN}All patches applied successfully!{RESET}") else: print(f" {RED}Some patches failed. Check output above.{RESET}") return all_ok, results # ─── Rollback ──────────────────────────────────────────────────────────── def rollback(home_dir=None): """Restore config.toml from latest backup.""" if home_dir is None: home_dir = os.path.expanduser("~") codex_dir = os.path.join(home_dir, ".codex") config_path = os.path.join(codex_dir, "config.toml") # Find latest backup backups = sorted(Path(codex_dir).glob("config.toml.backup.*"), reverse=True) if not backups: print(f"{RED}No backups found in {codex_dir}{RESET}") return False latest = backups[0] shutil.copy2(str(latest), config_path) print(f"{GREEN}Restored from {latest.name}{RESET}") return True # ─── Multi-User Support ───────────────────────────────────────────────── def list_user_homes(): """List home directories of real users. Returns list of (home_dir, uid, gid).""" homes = [] if IS_MACOS: # macOS: pwd.getpwall() is unreliable (Directory Services). # Scan /Users/ directly + /var/root for root. for base in ["/Users", "/var/root"]: if base == "/var/root": if os.path.isdir(base): info = os.stat(base) homes.append((base, info.st_uid, info.st_gid)) continue if not os.path.isdir(base): continue for name in os.listdir(base): if name.startswith(".") or name == "Shared": continue udir = os.path.join(base, name) if os.path.isdir(udir): info = os.stat(udir) homes.append((udir, info.st_uid, info.st_gid)) else: # Linux: use pwd try: import pwd for pw in pwd.getpwall(): if not os.path.isdir(pw.pw_dir): continue if pw.pw_uid < 1000 and pw.pw_uid != 0: continue if pw.pw_shell in ("/usr/sbin/nologin", "/bin/false"): continue homes.append((pw.pw_dir, pw.pw_uid, pw.pw_gid)) except ImportError: # Fallback: scan /home + /root for base in ["/home", "/root"]: if base == "/root": if os.path.isdir(base): homes.append((base, 0, 0)) continue if not os.path.isdir(base): continue for name in os.listdir(base): udir = os.path.join(base, name) if os.path.isdir(udir): info = os.stat(udir) homes.append((udir, info.st_uid, info.st_gid)) return homes def patch_user(user_home, config, uid=None, gid=None): """Patch a single user's ~/.codex/ config.""" codex_dir = os.path.join(user_home, ".codex") os.makedirs(codex_dir, exist_ok=True) # Generate model catalog catalog_path = os.path.join(codex_dir, "model_catalog.json") models = config.get("models", [config["model"]]) with open(catalog_path, "w", encoding="utf-8") as f: json.dump(generate_model_catalog(models), f, indent=2) config_path = os.path.join(codex_dir, "config.toml") existing = read_toml(config_path) backup_file(config_path) new_content = generate_config_toml(existing, config, home_dir=user_home) with open(config_path, "w", encoding="utf-8") as f: f.write(new_content) # Fix ownership so files belong to the user, not root if uid is not None and gid is not None: for path in [codex_dir, config_path, catalog_path]: if os.path.exists(path): os.chown(path, uid, gid) return True # ─── CLI ───────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="Codex CLI Patcher — route Codex through custom AI proxy" ) parser.add_argument("--apply", action="store_true", help="Apply all patches") parser.add_argument("--all", action="store_true", help="Patch all user accounts") parser.add_argument("--rollback", action="store_true", help="Restore from backup") parser.add_argument("--detect", action="store_true", help="Detect Codex installation") parser.add_argument("--validate", action="store_true", help="Validate config state") parser.add_argument("--config", type=str, help="Path to codex_config.json") parser.add_argument("--yes", action="store_true", help="Non-interactive mode") args = parser.parse_args() config = load_config(args.config) if args.detect: binary_path, version = detect_codex() print(f"Binary: {binary_path}") print(f"Version: {version}") return 0 if args.rollback: return 0 if rollback() else 1 if args.validate: # Import validator sys.path.insert(0, str(SCRIPT_DIR)) from updater.config_validator import validate_all, print_validation_report codex_dir = os.path.expanduser("~/.codex") results = validate_all(codex_dir, config) print_validation_report(results) return 0 if all(r[1] == "GREEN" for r in results) else 1 if args.apply: # Apply for current user ok, results = apply_all_patches(config) # Patch other users if --all if args.all: my_home = os.path.expanduser("~") for home_dir, uid, gid in list_user_homes(): if home_dir == my_home: continue try: patch_user(home_dir, config, uid=uid, gid=gid) print(f" Patched: {home_dir}/.codex/config.toml (uid={uid})") except Exception as e: print(f" {RED}Failed {home_dir}: {e}{RESET}") return 0 if ok else 1 parser.print_help() return 0 if __name__ == "__main__": raise SystemExit(main())