#!/usr/bin/env python3 """ vpn.py -- Manage WireGuard VPN server and peers. Reads core.json for VPN interface configuration. Any VLAN whose interface name starts with "wg" is treated as a WireGuard interface. Peer data is stored in per-interface dotfiles (.vpn-wg0, .vpn-wg1, etc.) in the same directory as this script. Server private keys: Stored at /etc/wireguard/.key (root read-only, 600). Generated once on --apply if not already present. Peer key model (WireGuard is symmetric): Each peer generates a keypair. The peer's PRIVATE key is embedded in their client config file and must be transferred to them securely. The peer's PUBLIC key is stored in the dotfile and written into the WireGuard conf. The server's public key is embedded in each client config. Client config files: Generated as vpn-client-.conf in the same directory as vpn.py. Permissions: 600 (root read-only). Contains the peer's private key -- transfer to the client by secure means (e.g. encrypted email, USB), then delete from this server. Usage: sudo python3 vpn.py --add-peer Add a new peer interactively sudo python3 vpn.py --manage-peers List and manage existing peers sudo python3 vpn.py --apply Write WireGuard config and sync peers sudo python3 vpn.py --disable Stop WireGuard on all interfaces sudo python3 vpn.py --status Show WireGuard service and interface status sudo python3 vpn.py --view-peers Show per-peer handshake and traffic stats """ import ipaddress import json import os import re import subprocess import sys import argparse from pathlib import Path from datetime import datetime, timezone SCRIPT_DIR = Path(__file__).parent DHCP_CONFIG_FILE = SCRIPT_DIR / "core.json" DDNS_CONFIG_FILE = SCRIPT_DIR / "ddns.json" WG_DIR = Path("/etc/wireguard") KEEPALIVE = 25 # ------------------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------------------ def die(msg): print(f"ERROR: {msg}") sys.exit(1) def check_root(): if os.geteuid() != 0: die("This script must be run as root (sudo).") def chown_to_script_dir_owner(path): """Chown a file to the owner of the script directory. Keeps SCRIPT_DIR files user-owned even when running as root. /etc/wireguard files are intentionally excluded — they stay root-owned. """ try: stat = SCRIPT_DIR.stat() os.chown(path, stat.st_uid, stat.st_gid) except OSError: pass # non-fatal def run(cmd, check=True, capture=True): return subprocess.run(cmd, capture_output=capture, text=True, check=check) def wg_available(): return subprocess.run(["which", "wg"], capture_output=True).returncode == 0 def sense_mtu(dhcp_data): """ Derive the recommended WireGuard tunnel MTU from the WAN interface MTU. WireGuard adds 60 bytes of overhead for IPv4, so tunnel MTU = link MTU - 60. Falls back to 1420 (correct for standard 1500 MTU links) if sensing fails. Reads wan_interface from core.json general block. """ wan = dhcp_data.get("general", {}).get("wan_interface", "") if wan: try: result = subprocess.run( ["ip", "link", "show", wan], capture_output=True, text=True ) m = re.search(r"\bmtu\s+(\d+)", result.stdout) if m: return int(m.group(1)) - 60 except Exception: pass return 1420 def check_wireguard_tools(): """Ensure wireguard-tools is installed; prompt to install via apt if not.""" if wg_available(): return print("wireguard-tools is not installed (provides wg and wg-quick).") print() while True: choice = input("Install wireguard-tools now via apt? [y/N]: ").strip().lower() if choice in ("y", "yes"): break if choice in ("n", "no", ""): die("Cannot continue without wireguard-tools. Install it and retry.") result = subprocess.run(["apt-get", "install", "-y", "wireguard-tools"]) if result.returncode != 0: die("Package installation failed. Install wireguard-tools manually and retry.") def _fmt_bytes(n): if n < 1024: return f"{n} B" elif n < 1024 ** 2: return f"{n / 1024:.1f} KB" elif n < 1024 ** 3: return f"{n / 1024**2:.1f} MB" else: return f"{n / 1024**3:.2f} GB" # ------------------------------------------------------------------------------ # Load core.json / dotfiles # ------------------------------------------------------------------------------ def load_dhcp(): if not DHCP_CONFIG_FILE.exists(): die(f"Config file not found: {DHCP_CONFIG_FILE}") with open(DHCP_CONFIG_FILE) as f: return json.load(f) def wg_interfaces(dhcp_data): """Return list of VLAN dicts whose interface name starts with 'wg'.""" return [v for v in dhcp_data.get("vlans", []) if v.get("interface", "").startswith("wg")] def dotfile_path(iface): return SCRIPT_DIR / f".vpn-{iface}" def vpi(vlan): """Return the vpn_information dict for a WG VLAN (plain object, not list).""" return vlan["vpn_information"] def server_key_path(iface): return WG_DIR / f"{iface}.key" def wg_conf_path(iface): return WG_DIR / f"{iface}.conf" def load_peers(iface): """Load peers list from the dotfile. Returns [] if file does not exist.""" path = dotfile_path(iface) if not path.exists(): return [] with open(path) as f: return json.load(f).get("peers", []) def save_peers(iface, peers): """Write peers list to the dotfile with 600 permissions.""" path = dotfile_path(iface) with open(path, "w") as f: json.dump({"peers": peers}, f, indent=2) f.write("\n") path.chmod(0o600) chown_to_script_dir_owner(path) # ------------------------------------------------------------------------------ # IP allocation # ------------------------------------------------------------------------------ def next_available_ip(vlan, peers): """ Find the first available peer IP in the wg VLAN subnet, starting from .2. Skips the gateway IP. Scans .2-.254 for the first gap. """ gateway = vpi(vlan)["gateway"] gw_net = ipaddress.IPv4Interface(f"{gateway}/24") network = gw_net.network base = int(network.network_address) used = {int(ipaddress.IPv4Address(gateway))} for peer in peers: try: used.add(int(ipaddress.IPv4Interface(peer["ip"]).ip)) except (KeyError, ValueError): pass for offset in range(2, 255): candidate = ipaddress.IPv4Address(base + offset) if int(candidate) not in used: return f"{candidate}/32" die(f"No available IPs in VPN subnet {network} (all .2-.254 allocated).") # ------------------------------------------------------------------------------ # Key management # ------------------------------------------------------------------------------ def generate_server_key(iface): """Generate server private key and store at WG_DIR/.key (600).""" WG_DIR.mkdir(exist_ok=True) private = run(["wg", "genkey"]).stdout.strip() kf = server_key_path(iface) kf.write_text(private + "\n") kf.chmod(0o600) print(f"Server private key generated: {kf}") def get_server_public_key(iface): """Derive and return the server's public key from the stored private key.""" kf = server_key_path(iface) if not kf.exists(): die(f"Server private key not found at {kf}. Run --apply first.") private = kf.read_text().strip() return subprocess.run( ["wg", "pubkey"], input=private, capture_output=True, text=True, check=True ).stdout.strip() def generate_peer_keypair(): """Generate and return (private_key, public_key) for a peer.""" private = run(["wg", "genkey"]).stdout.strip() public = subprocess.run( ["wg", "pubkey"], input=private, capture_output=True, text=True, check=True ).stdout.strip() return private, public # ------------------------------------------------------------------------------ # Endpoint resolution # ------------------------------------------------------------------------------ def resolve_endpoint(listen_port): """ Resolve the public endpoint for client configs. Resolution order: 1. First enabled provider hostname from ddns.json 2. Manual entry prompt if ddns.json is missing or has no enabled provider The user is always shown the resolved value and given the opportunity to edit it before it is used. """ import readline candidate = None source = None if DDNS_CONFIG_FILE.exists(): with open(DDNS_CONFIG_FILE) as f: ddns = json.load(f) for provider in ddns.get("providers", []): if provider.get("enabled") is not True: continue ptype = provider.get("provider", "").lower() if ptype == "noip": hostnames = provider.get("hostnames", []) if hostnames: candidate = f"{hostnames[0]}:{listen_port}" source = "ddns.json" break elif ptype == "duckdns": subdomains = provider.get("subdomains", []) if subdomains: candidate = f"{subdomains[0]}.duckdns.org:{listen_port}" source = "ddns.json" break if not candidate: print("No enabled DDNS provider found in ddns.json.") else: print(f"ddns.json not found at {DDNS_CONFIG_FILE}.") if candidate: prompt = f"Public endpoint (from {source}): " else: print("Please enter the public endpoint manually.") prompt = "Public endpoint (hostname:port): " while True: try: readline.set_startup_hook( lambda: readline.insert_text(candidate) if candidate else None ) entry = input(prompt).strip() finally: readline.set_startup_hook(None) if not entry: print(" Endpoint cannot be empty.") continue if ":" not in entry: entry = f"{entry}:{listen_port}" return entry # ------------------------------------------------------------------------------ # Split-tunnel route computation # ------------------------------------------------------------------------------ def split_tunnel_routes(dhcp_data): """ Return a list of CIDR strings for all VLANs defined in core.json. WG VLANs use vpn_information.gateway to derive their subnet. Used as AllowedIPs in client configs when split_tunnel is true. """ routes = [] for v in dhcp_data.get("vlans", []): if v.get("interface", "").startswith("wg"): gw = vpi(v)["gateway"] net = ipaddress.IPv4Network(f"{gw}/24", strict=False) routes.append(str(net)) else: d = v["dhcp"] net = ipaddress.IPv4Network(f"{d['subnet']}/{d['subnet_mask']}", strict=False) routes.append(str(net)) return routes # ------------------------------------------------------------------------------ # Client config # ------------------------------------------------------------------------------ def build_client_conf(peer, private_key, server_public_key, endpoint, allowed_ips, dns, domain, mtu): dns_line = f"DNS = {dns}, {domain}" if domain else f"DNS = {dns}" return "\n".join([ "[Interface]", f"PrivateKey = {private_key}", f"Address = {peer['ip']}", dns_line, f"MTU = {mtu}", "", "[Peer]", f"PublicKey = {server_public_key}", f"Endpoint = {endpoint}", f"AllowedIPs = {allowed_ips}", f"PersistentKeepalive = {KEEPALIVE}", "", ]) def write_client_conf(peer, private_key, server_public_key, endpoint, allowed_ips, dns, domain, mtu): conf_path = SCRIPT_DIR / f"vpn-client-{peer['name']}.conf" content = build_client_conf(peer, private_key, server_public_key, endpoint, allowed_ips, dns, domain, mtu) conf_path.write_text(content) conf_path.chmod(0o600) chown_to_script_dir_owner(conf_path) return conf_path # ------------------------------------------------------------------------------ # WireGuard server conf # ------------------------------------------------------------------------------ def build_wg_conf(vlan, peers, server_private_key): iface = vlan["interface"] info = vpi(vlan) gateway = info["gateway"] gw_net = ipaddress.IPv4Interface(f"{gateway}/24") server_ip = f"{gateway}/{gw_net.network.prefixlen}" listen_port = info["listen_port"] lines = [ "# Generated by vpn.py -- do not edit manually.", "# Run: sudo python3 vpn.py --apply", "", "[Interface]", f"PrivateKey = {server_private_key}", f"Address = {server_ip}", f"ListenPort = {listen_port}", "", ] for peer in peers: if peer.get("enabled") is True: lines += [ f"# {peer['name']}", "[Peer]", f"PublicKey = {peer['public_key']}", f"AllowedIPs = {peer['ip']}", "", ] return "\n".join(lines) # ------------------------------------------------------------------------------ # Live peer sync # ------------------------------------------------------------------------------ def sync_peers_live(iface, peers): """ Sync live WireGuard peers to match the dotfile state without a service restart. Adds peers present in the dotfile (enabled) but not live; removes peers live but not in the enabled dotfile set. """ result = run(["wg", "show", iface, "dump"], check=False) if result.returncode != 0: return # interface not up yet lines = result.stdout.strip().splitlines() live_keys = set() for line in lines[1:]: # first line is the server interface itself parts = line.split("\t") if parts: live_keys.add(parts[0]) enabled_peers = { p["public_key"]: p for p in peers if p.get("enabled") is True } dotfile_keys = set(enabled_peers.keys()) for key in dotfile_keys - live_keys: peer = enabled_peers[key] run(["wg", "set", iface, "peer", key, "allowed-ips", peer["ip"]]) print(f" Added peer: {peer['name']} ({peer['ip']})") for key in live_keys - dotfile_keys: run(["wg", "set", iface, "peer", key, "remove"]) print(f" Removed peer: {key[:16]}...") # ------------------------------------------------------------------------------ # Interface selection # ------------------------------------------------------------------------------ def validate_wg_vlans(wg_vlans): """Die with a clear message if any wg VLAN is missing a valid vpn_information block.""" for vlan in wg_vlans: iface = vlan.get("interface", "?") info = vlan.get("vpn_information") if not isinstance(info, dict): die(f"Interface '{iface}' is missing a vpn_information block in core.json. " f"Add: \"vpn_information\": {{\"listen_port\": 51820, \"gateway\": \"...\"}}") if not isinstance(info.get("listen_port"), int): die(f"Interface '{iface}' vpn_information is missing a valid listen_port in core.json.") if not info.get("gateway"): die(f"Interface '{iface}' vpn_information is missing gateway in core.json.") def pick_wg_interface(wg_vlans): """ If only one wg interface exists, return it immediately. Otherwise, print a numbered list and prompt the user to pick. """ if len(wg_vlans) == 1: return wg_vlans[0] print("Available WireGuard interfaces:") for i, vlan in enumerate(wg_vlans, 1): lp = vpi(vlan)["listen_port"] print(f" {i}. {vlan['interface']} ({vlan['name']}, port {lp})") print() while True: choice = input("Select interface number: ").strip() try: idx = int(choice) - 1 if 0 <= idx < len(wg_vlans): return wg_vlans[idx] except ValueError: pass print(" Invalid selection.") # ------------------------------------------------------------------------------ # --add-peer # ------------------------------------------------------------------------------ def cmd_add_peer(dhcp_data): check_root() check_wireguard_tools() wg_vlans = wg_interfaces(dhcp_data) vlan = pick_wg_interface(wg_vlans) iface = vlan["interface"] peers = load_peers(iface) # -- Resolve endpoint ------------------------------------------------------- endpoint = resolve_endpoint(vpi(vlan)["listen_port"]) # -- Peer name ------------------------------------------------------------- print() while True: name = input("Peer name (e.g. norman-laptop): ").strip() if not name: print(" Name cannot be empty.") continue if any(p["name"] == name for p in peers): print(f" A peer named '{name}' already exists for {iface}.") continue break # -- Peer IP --------------------------------------------------------------- prefill = next_available_ip(vlan, peers) print(f"Peer IP [{prefill}]: ", end="", flush=True) ip_input = input().strip() peer_ip = ip_input if ip_input else prefill d = vpi(vlan) gateway = d["gateway"] network = ipaddress.IPv4Network(f"{gateway}/24", strict=False) try: iface_ip = ipaddress.IPv4Interface(peer_ip) if iface_ip.ip not in network: die(f"IP '{peer_ip}' is not within VPN subnet {network}.") if iface_ip.ip == ipaddress.IPv4Address(gateway): die(f"IP '{peer_ip}' is the server gateway. Choose a different IP.") for p in peers: if ipaddress.IPv4Interface(p["ip"]).ip == iface_ip.ip: die(f"IP '{peer_ip}' is already assigned to peer '{p['name']}'.") except ValueError as e: die(f"Invalid IP '{peer_ip}': {e}") # -- Generate keypair ------------------------------------------------------- print(f"\nGenerating keypair for '{name}'...") private_key, public_key = generate_peer_keypair() # -- Ensure server key exists ----------------------------------------------- kf = server_key_path(iface) if not kf.exists(): print(f"Server private key not found for {iface} -- generating now...") generate_server_key(iface) server_public_key = get_server_public_key(iface) # -- Split tunnel prompt --------------------------------------------------- print() st_input = input("Split tunnel? Route only VPN subnets (not all traffic) through WireGuard. [Y/n]: ").strip().lower() split_tunnel = st_input != "n" if split_tunnel: allowed_ips = ", ".join(split_tunnel_routes(dhcp_data)) else: allowed_ips = "0.0.0.0/0" info = vpi(vlan) dns = info.get("explicit_overrides", {}).get("dns_server", "") or gateway domain = info.get("domain", "") mtu_override = info.get("explicit_overrides", {}).get("mtu", "") mtu = int(mtu_override) if mtu_override else sense_mtu(dhcp_data) # -- Add peer to dotfile ---------------------------------------------------- new_peer = {"name": name, "ip": peer_ip, "public_key": public_key, "split_tunnel": split_tunnel, "enabled": True} peers.append(new_peer) save_peers(iface, peers) print(f"Peer '{name}' added to {dotfile_path(iface).name} (ip: {peer_ip}).") # -- Write client config ---------------------------------------------------- conf_path = write_client_conf(new_peer, private_key, server_public_key, endpoint, allowed_ips, dns, domain, mtu) private_key = "0" * len(private_key) del private_key # -- Instructions ----------------------------------------------------------- print() print("=" * 68) print(f" Client config written: {conf_path}") print() print(" NEXT STEPS:") print(f" 1. Transfer {conf_path.name} to '{name}' by secure means") print(" (encrypted email, USB drive, etc.).") print(" 2. The recipient imports it into their WireGuard app.") print(f" 3. Delete {conf_path} from this server once transferred.") print() print(" WARNING: This file contains the peer's private key.") print(" Do not leave it on this server longer than necessary.") print("=" * 68) print() print(" To apply changes to WireGuard, run:") print(" sudo python3 vpn.py --apply") print() # ------------------------------------------------------------------------------ # --list-peers # ------------------------------------------------------------------------------ def cmd_list_peers(dhcp_data): check_root() check_wireguard_tools() wg_vlans = wg_interfaces(dhcp_data) # -- Collect all peers across all interfaces -------------------------------- # Each entry: (iface, peer_dict, vlan_dict, peers_list) all_entries = [] for vlan in wg_vlans: iface = vlan["interface"] peers = load_peers(iface) for peer in peers: all_entries.append((iface, peer, vlan, peers)) if not all_entries: print("No peers found across any WireGuard interface.") return # -- List peers ------------------------------------------------------------- print("Peers:") for i, (iface, peer, _, _) in enumerate(all_entries, 1): status = "enabled" if peer.get("enabled") else "disabled" print(f" {i}. [{iface}] {peer['name']} {peer['ip']} [{status}]") print() while True: choice = input("Select peer number (or Enter to cancel): ").strip() if not choice: return try: idx = int(choice) - 1 if 0 <= idx < len(all_entries): break except ValueError: pass print(" Invalid selection.") iface, peer, vlan, peers = all_entries[idx] print(f"\nSelected: {peer['name']} on {iface}") print(" 1. Rename") print(" 2. Regenerate keys") print(" 3. Delete") print(" 4. Cancel") print() action = input("Select action: ").strip() modified = False if action == "1": # -- Rename ---------------------------------------------------------------- while True: new_name = input(f"New name [{peer['name']}]: ").strip() if not new_name: print(" Name cannot be empty.") continue if any(p["name"] == new_name for p in peers if p is not peer): print(f" A peer named '{new_name}' already exists for {iface}.") continue break old_conf = SCRIPT_DIR / f"vpn-client-{peer['name']}.conf" new_conf = SCRIPT_DIR / f"vpn-client-{new_name}.conf" if old_conf.exists(): old_conf.rename(new_conf) print(f" Renamed client config: {old_conf.name} -> {new_conf.name}") peer["name"] = new_name save_peers(iface, peers) print(f"Peer renamed to '{new_name}'.") modified = True elif action == "2": # -- Regenerate keys ------------------------------------------------------- kf = server_key_path(iface) if not kf.exists(): die(f"Server private key not found at {kf}. Run --apply first.") endpoint = resolve_endpoint(vpi(vlan)["listen_port"]) print(f"\nRegenerating keypair for '{peer['name']}'...") private_key, public_key = generate_peer_keypair() server_public_key = get_server_public_key(iface) if peer.get("split_tunnel", True): allowed_ips = ", ".join(split_tunnel_routes(dhcp_data)) else: allowed_ips = "0.0.0.0/0" info = vpi(vlan) gateway = info["gateway"] dns = info.get("explicit_overrides", {}).get("dns_server", "") or gateway domain = info.get("domain", "") mtu_override = info.get("explicit_overrides", {}).get("mtu", "") mtu = int(mtu_override) if mtu_override else sense_mtu(dhcp_data) peer["public_key"] = public_key save_peers(iface, peers) conf_path = write_client_conf(peer, private_key, server_public_key, endpoint, allowed_ips, dns, domain, mtu) private_key = "0" * len(private_key) del private_key print() print("=" * 68) print(f" New client config written: {conf_path}") print() print(" WARNING: This file contains the peer's private key.") print(" The previous client config is now invalid.") print(" Transfer the new config and delete it from this server.") print("=" * 68) modified = True elif action == "3": # -- Delete ---------------------------------------------------------------- confirm = input(f"\nDelete peer '{peer['name']}' from {iface}? [y/N]: ").strip().lower() if confirm != "y": print("Cancelled.") return peers[:] = [p for p in peers if p["name"] != peer["name"]] save_peers(iface, peers) conf_path = SCRIPT_DIR / f"vpn-client-{peer['name']}.conf" if conf_path.exists(): print(f" NOTE: Client config {conf_path.name} still exists on this server.") print(f" It is now invalid and should be deleted.") print(f"Peer '{peer['name']}' deleted from {iface}.") modified = True elif action == "4": print("Cancelled.") return else: die("Invalid action.") if modified: print() print(" To apply changes to WireGuard, run:") print(" sudo python3 vpn.py --apply") print() # ------------------------------------------------------------------------------ # --apply # ------------------------------------------------------------------------------ def cmd_apply(dhcp_data): check_root() check_wireguard_tools() wg_vlans = wg_interfaces(dhcp_data) for vlan in wg_vlans: iface = vlan["interface"] print(f"-- {iface} " + "-" * 58) peers = load_peers(iface) # -- Ensure server key ----------------------------------------------- kf = server_key_path(iface) if not kf.exists(): print(f" Generating server private key for {iface}...") generate_server_key(iface) else: print(f" Using existing server key: {kf}") server_private_key = kf.read_text().strip() server_public_key = get_server_public_key(iface) # -- Write wg conf --------------------------------------------------- WG_DIR.mkdir(exist_ok=True) conf_file = wg_conf_path(iface) new_conf = build_wg_conf(vlan, peers, server_private_key) listen_port = vpi(vlan)["listen_port"] # Detect whether the listen port has changed in the existing conf port_changed = False if conf_file.exists(): for line in conf_file.read_text().splitlines(): if line.startswith("ListenPort"): old_port = line.split("=")[1].strip() port_changed = old_port != str(listen_port) break conf_file.write_text(new_conf) conf_file.chmod(0o600) print(f" Written: {conf_file}") # -- Start or sync service ------------------------------------------- svc = f"wg-quick@{iface}" result = run(["systemctl", "is-active", svc], check=False) if result.stdout.strip() == "active": if port_changed: print(f" Listen port changed -- restarting {svc}...") result2 = run(["systemctl", "restart", svc], check=False) if result2.returncode != 0: die(f"Failed to restart {svc}:\n{result2.stderr.strip()}") print(f" Service {svc} restarted.") else: print(f" Service {svc} is active -- syncing peers live...") sync_peers_live(iface, peers) else: result2 = run(["systemctl", "enable", "--now", svc], check=False) if result2.returncode != 0: result3 = run(["systemctl", "reload-or-restart", svc], check=False) if result3.returncode != 0: die(f"Failed to start {svc}:\n{result3.stderr.strip()}") print(f" Service {svc} enabled and started.") # -- Summary --------------------------------------------------------- enabled_peers = [p for p in peers if p.get("enabled") is True] print(f" Server public key: {server_public_key}") print(f" Listen port: UDP {vpi(vlan)['listen_port']}") print(f" Enabled peers: {len(enabled_peers)}") for p in enabled_peers: print(f" {p['ip']:<22} {p['name']}") print() # -- Apply core config to pick up VPN firewall rules --------------------- core_py = SCRIPT_DIR / "core.py" if core_py.exists(): print("-- Applying core config (core.py --apply) ----------------------------") result = subprocess.run( [sys.executable, str(core_py), "--apply"], capture_output=False ) if result.returncode != 0: print("WARNING: core.py --apply returned non-zero. Check output above.") else: print(f"WARNING: {core_py} not found -- run core.py --apply manually to load VPN firewall rules.") # ------------------------------------------------------------------------------ # --disable # ------------------------------------------------------------------------------ def cmd_disable(dhcp_data): check_root() wg_vlans = wg_interfaces(dhcp_data) for vlan in wg_vlans: iface = vlan["interface"] svc = f"wg-quick@{iface}" result = run(["systemctl", "disable", "--now", svc], check=False) if result.returncode != 0: print(f"WARNING: {svc} may not have been running:\n{result.stderr.strip()}") else: print(f"WireGuard service {svc} stopped and disabled.") # ------------------------------------------------------------------------------ # --status # ------------------------------------------------------------------------------ def cmd_status(dhcp_data): check_root() wg_vlans = wg_interfaces(dhcp_data) print(f" {'UNIT':<45} {'ACTIVE':<12} {'ENABLED'}") print(f" {'-'*45} {'-'*12} {'-'*10}") for vlan in wg_vlans: iface = vlan["interface"] svc = f"wg-quick@{iface}" r_active = run(["systemctl", "is-active", svc], check=False) r_enabled = run(["systemctl", "is-enabled", svc], check=False) active = r_active.stdout.strip() enabled = r_enabled.stdout.strip() active_sym = "✓" if active == "active" else "✗" enabled_sym = "✓" if enabled == "enabled" else "✗" print(f" {svc:<45} {active_sym} {active:<10} {enabled_sym} {enabled}") if active == "active": result = run(["wg", "show", iface], check=False) if result.returncode == 0: info = {} for line in result.stdout.splitlines(): line = line.strip() if line.startswith("public key:"): info["pubkey"] = line.split(":", 1)[1].strip() elif line.startswith("listening port:"): info["port"] = line.split(":", 1)[1].strip() elif line.startswith("peer:"): info.setdefault("peers", 0) info["peers"] += 1 if "pubkey" in info: print(f" public key: {info['pubkey']}") if "port" in info: print(f" listening port: {info['port']}") peers = load_peers(iface) enabled_peers = [p for p in peers if p.get("enabled") is True] print(f" peers: {len(enabled_peers)} configured, {info.get('peers', 0)} connected") # ------------------------------------------------------------------------------ # --logs # ------------------------------------------------------------------------------ def cmd_logs(dhcp_data): check_root() wg_vlans = wg_interfaces(dhcp_data) now = datetime.now(timezone.utc) for vlan in wg_vlans: iface = vlan["interface"] peers = load_peers(iface) peer_by_key = {p["public_key"]: p["name"] for p in peers} print(f"-- {iface} " + "-" * 58) result = run(["wg", "show", iface, "dump"], check=False) if result.returncode != 0: print(f" WireGuard interface '{iface}' is not up.") print() continue lines = result.stdout.strip().splitlines() peer_lines = lines[1:] # first line is the server interface itself if not peer_lines: print(" No peers currently configured.") print() continue print(f" {'PEER':<22} {'IP':<20} {'ENDPOINT':<26} {'LAST HANDSHAKE':<22} {'RX':<12} {'TX'}") print(" " + "-" * 106) for ln in peer_lines: parts = ln.split("\t") if len(parts) < 6: continue pub_key = parts[0] endpoint = parts[2] if parts[2] != "(none)" else "not connected" allowed_ips = parts[3] last_hs_ts = parts[4] rx_bytes = int(parts[5]) tx_bytes = int(parts[6]) if len(parts) > 6 else 0 name = peer_by_key.get(pub_key, pub_key[:12] + "...") if last_hs_ts == "0": last_hs = "never" else: ts = int(last_hs_ts) hs_dt = datetime.fromtimestamp(ts, tz=timezone.utc) delta = now - hs_dt seconds = int(delta.total_seconds()) if seconds < 60: last_hs = f"{seconds}s ago" elif seconds < 3600: last_hs = f"{seconds // 60}m ago" elif seconds < 86400: last_hs = f"{seconds // 3600}h ago" else: last_hs = f"{seconds // 86400}d ago" rx_str = _fmt_bytes(rx_bytes) tx_str = _fmt_bytes(tx_bytes) display_ip = allowed_ips.split(",")[0].strip() print(f" {name:<22} {display_ip:<20} {endpoint:<26} {last_hs:<22} {rx_str:<12} {tx_str}") print() # ------------------------------------------------------------------------------ # Main # ------------------------------------------------------------------------------ def main(): parser = argparse.ArgumentParser( description="Manage WireGuard VPN server and peers", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "examples:\n" " sudo python3 vpn.py --add-peer Add a new peer interactively\n" " sudo python3 vpn.py --manage-peers List and manage existing peers\n" " sudo python3 vpn.py --apply Write WireGuard config and sync peers\n" " sudo python3 vpn.py --disable Stop WireGuard on all interfaces\n" " sudo python3 vpn.py --status Show WireGuard service and interface status\n" " sudo python3 vpn.py --view-peers Show per-peer handshake and traffic stats\n" ) ) parser.add_argument("--add-peer", action="store_true", help="Add a new peer interactively") parser.add_argument("--manage-peers", action="store_true", help="List and manage existing peers") parser.add_argument("--apply", action="store_true", help="Write WireGuard config and sync peers") parser.add_argument("--disable", action="store_true", help="Stop WireGuard on all interfaces") parser.add_argument("--status", action="store_true", help="Show WireGuard service and interface status") parser.add_argument("--view-peers", action="store_true", help="Show per-peer handshake and traffic stats") args = parser.parse_args() if not any([args.add_peer, args.manage_peers, args.apply, args.disable, args.status, args.view_peers]): parser.print_help() sys.exit(0) dhcp_data = load_dhcp() wg_vlans = wg_interfaces(dhcp_data) if not wg_vlans: die("No WireGuard interfaces (wg*) found in core.json.") validate_wg_vlans(wg_vlans) if args.add_peer: cmd_add_peer(dhcp_data) elif args.manage_peers: cmd_list_peers(dhcp_data) elif args.apply: cmd_apply(dhcp_data) elif args.disable: cmd_disable(dhcp_data) elif args.status: cmd_status(dhcp_data) elif args.view_peers: cmd_logs(dhcp_data) if __name__ == "__main__": main()