UI improvement

This commit is contained in:
Matthew Grotke 2026-05-18 14:38:23 -04:00
parent 575edc836d
commit 9a272ee959
16 changed files with 2477 additions and 1604 deletions

View file

@ -4,7 +4,7 @@ core.py -- Apply core.json to systemd-networkd, per-VLAN dnsmasq instances, and
Each VLAN defined in core.json gets its own dnsmasq instance that handles
both DHCP and DNS for that VLAN. WireGuard VLANs get a DNS-only instance
(no DHCP, since WireGuard peers get IPs from vpn.py).
(no DHCP, since peers have statically assigned IPs).
Each instance binds exclusively to its VLAN gateway IP on port 53, so
instances do not conflict with each other or with the system dnsmasq.service,
@ -117,6 +117,8 @@ TIMER_SVC_FILE = SYSTEMD_DIR / f"{TIMER_NAME}.service"
RESOLV_CONF = Path("/etc/resolv.conf")
NAT_SERVICE_NAME = "core-nat"
NAT_SERVICE_FILE = SYSTEMD_DIR / f"{NAT_SERVICE_NAME}.service"
WG_DIR = Path("/etc/wireguard")
WG_KEEPALIVE = 25
log = None
@ -180,13 +182,15 @@ def check_root():
if os.geteuid() != 0:
die("This script must be run as root (sudo).")
def prefix_to_dotted(n):
mask = (0xFFFFFFFF << (32 - int(n))) & 0xFFFFFFFF
return '.'.join(str((mask >> (8 * i)) & 0xFF) for i in (3, 2, 1, 0))
def network_for(vlan):
d = vlan["dhcp"]
return ipaddress.IPv4Network(f"{d['subnet']}/{d['subnet_mask']}", strict=False)
return ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
def lowest_quartet_ip(vlan):
"""Return the server_identity IP with the lowest value in the last octet.
Only called for non-WG VLANs which have a server_identities list."""
"""Return the server_identity IP with the lowest value in the last octet."""
identities = vlan.get("server_identities", [])
ips = []
for s in identities:
@ -202,26 +206,28 @@ def resolve_vlan_options(vlan):
"""
Resolve gateway, dns_server, and ntp_server for a VLAN.
For WG VLANs: gateway comes directly from vpn_information.gateway.
dns_server defaults to gateway unless explicit_overrides.dns_server
is set. ntp_server is None -- WireGuard has no DHCP so NTP cannot
be advertised to peers.
For both WG and non-WG VLANs: gateway defaults to the lowest-last-octet
server_identity IP unless overridden in explicit_overrides. The gateway
override must be one of the server_identity IPs.
For non-WG VLANs: all three default to the lowest-last-octet
server_identity IP unless overridden in dhcp.explicit_overrides.
WG VLANs: ntp_server is None (WireGuard has no DHCP so NTP cannot be
advertised to peers). Overrides live in vpn_information.explicit_overrides.
Non-WG VLANs: overrides live in dhcp_information.explicit_overrides.
Returns a dict with keys: gateway, dns_server, ntp_server.
"""
if is_wg(vlan):
vpi = vlan["vpn_information"]
gateway = vpi["gateway"]
overrides = vpi.get("explicit_overrides", {})
default = lowest_quartet_ip(vlan) or str(next(network_for(vlan).hosts()))
gateway = overrides.get("gateway", "") or default
dns = overrides.get("dns_server", "") or gateway
return {
"gateway": gateway,
"dns_server": dns,
"ntp_server": None,
}
overrides = vlan.get("dhcp", {}).get("explicit_overrides", {})
overrides = vlan.get("dhcp_information", {}).get("explicit_overrides", {})
default = lowest_quartet_ip(vlan)
return {
"gateway": overrides.get("gateway", "") or default,
@ -233,7 +239,27 @@ def is_physical(vlan):
return vlan["vlan_id"] == 1
def is_wg(vlan):
return vlan.get("interface", "").startswith("wg")
return vlan.get("is_vpn", False)
def inject_interfaces(data):
"""Compute and inject the 'interface' field for every VLAN from is_vpn + vlan_id.
is_vpn=False (regular VLAN):
vlan_id 1 general.lan_interface (e.g. enp6s0)
vlan_id N lan_interface.N (e.g. enp6s0.10)
is_vpn=True (WireGuard VLAN):
1st WG VLAN wg0, 2nd wg1, etc. (order in vlans array)
"""
lan = data.get("general", {}).get("lan_interface", "eth0")
wg_idx = 0
for vlan in data.get("vlans", []):
if vlan.get("is_vpn"):
vlan["interface"] = f"wg{wg_idx}"
wg_idx += 1
else:
vid = vlan.get("vlan_id", 1)
vlan["interface"] = lan if vid == 1 else f"{lan}.{vid}"
def networkd_stem(vlan):
return f"10-router-{vlan['name']}"
@ -298,6 +324,7 @@ def load_config():
# ===================================================================
def validate_config(data):
inject_interfaces(data)
errors = []
seen_vlan_ids = {}
seen_interfaces = {}
@ -308,11 +335,15 @@ def validate_config(data):
if not data.get("upstream_dns", {}).get("upstream_servers"):
errors.append("upstream_dns.upstream_servers is missing or empty.")
# -- WAN interface ---------------------------------------------------------
wan = data.get("general", {}).get("wan_interface", "")
# -- WAN / LAN interfaces --------------------------------------------------
gen = data.get("general", {})
wan = gen.get("wan_interface", "")
lan = gen.get("lan_interface", "")
if not wan:
errors.append("general.wan_interface is missing or empty.")
else:
if not lan:
errors.append("general.lan_interface is missing or empty.")
if wan and lan:
available_interfaces = set()
try:
result = subprocess.run(["ip", "link", "show"], capture_output=True, text=True)
@ -320,8 +351,13 @@ def validate_config(data):
available_interfaces = {i.split("@")[0] for i in available_interfaces}
except Exception:
pass
if available_interfaces and wan not in available_interfaces:
errors.append(f"general.wan_interface: '{wan}' does not exist on this system.")
if available_interfaces:
if wan not in available_interfaces:
errors.append(f"general.wan_interface: '{wan}' does not exist on this system.")
if lan not in available_interfaces:
errors.append(f"general.lan_interface: '{lan}' does not exist on this system.")
if wan == lan:
errors.append(f"general.wan_interface and general.lan_interface must be different (both set to '{wan}').")
# -- Blocklist library -----------------------------------------------------
blocklists_by_name = {}
@ -370,9 +406,11 @@ def validate_config(data):
errors.append(f"{label}: mdns_reflection must be false for WireGuard interfaces.")
if is_wg(vlan):
# -- vpn_information -----------------------------------------------
vpi = vlan.get("vpn_information")
if not isinstance(vpi, dict):
errors.append(f"{label}: vpn_information must be a plain object.")
vpi = {}
else:
lp = vpi.get("listen_port")
if not isinstance(lp, int) or not (1 <= lp <= 65535):
@ -382,160 +420,234 @@ def validate_config(data):
f"'{seen_listen_ports[lp]}'.")
else:
seen_listen_ports[lp] = name
gw = vpi.get("gateway", "")
if not gw:
errors.append(f"{label}: vpn_information.gateway is required.")
# -- subnet/subnet_mask --------------------------------------------
for field in ("subnet", "subnet_mask"):
if not vlan.get(field):
errors.append(f"{label}: missing required field '{field}'.")
wg_net = None
if vlan.get("subnet") and vlan.get("subnet_mask"):
try:
wg_net = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
vlan_networks[iface] = wg_net
except ValueError as e:
errors.append(f"{label}: invalid subnet/subnet_mask: {e}")
# -- server_identities ---------------------------------------------
if not vlan.get("server_identities"):
errors.append(f"{label}: server_identities is empty or missing.")
identity_ips = []
for idx, ident in enumerate(vlan.get("server_identities", [])):
ip_str = ident.get("ip", "")
ilabel = f"{label} server_identities[{idx}] '{ident.get('description', '?')}'"
if not ip_str:
errors.append(f"{ilabel}: missing 'ip' field.")
continue
try:
ip = ipaddress.IPv4Address(ip_str)
if wg_net and ip not in wg_net:
errors.append(f"{ilabel}: ip '{ip_str}' is not within subnet {wg_net}.")
else:
identity_ips.append(ip)
except ValueError:
errors.append(f"{ilabel}: ip '{ip_str}' is not a valid IPv4 address.")
# -- vpn_information.explicit_overrides ----------------------------
eo = vpi.get("explicit_overrides", {}) if isinstance(vpi, dict) else {}
if not isinstance(eo, dict):
errors.append(f"{label}: vpn_information.explicit_overrides must be a plain object.")
else:
gw = eo.get("gateway", "")
if gw:
try:
gw_ip = ipaddress.IPv4Address(gw)
if identity_ips and gw_ip not in identity_ips:
errors.append(
f"{label}: vpn_information.explicit_overrides.gateway '{gw}' does not match "
f"any server_identity IP. Must be one of: "
f"{[str(ip) for ip in identity_ips]}."
)
except ValueError:
errors.append(f"{label}: vpn_information.explicit_overrides.gateway '{gw}' is not a valid IPv4 address.")
dns = eo.get("dns_server", "")
if dns:
try:
ipaddress.IPv4Address(dns)
except ValueError:
errors.append(f"{label}: vpn_information.explicit_overrides.dns_server '{dns}' is not a valid IPv4 address.")
mtu = eo.get("mtu", "")
if mtu:
try:
m = int(mtu)
if not (576 <= m <= 9000):
errors.append(f"{label}: vpn_information.explicit_overrides.mtu {mtu} is out of valid range (576-9000).")
except (ValueError, TypeError):
errors.append(f"{label}: vpn_information.explicit_overrides.mtu '{mtu}' is not a valid integer.")
# -- peers ---------------------------------------------------------
seen_peer_names = {}
seen_peer_ips = {}
for pidx, peer in enumerate(vlan.get("peers", [])):
pname = peer.get("name", "")
plabel = f"{label} peer[{pidx}] '{pname}'"
if not pname:
errors.append(f"{plabel}: missing 'name' field.")
elif pname in seen_peer_names:
errors.append(f"{plabel}: duplicate peer name '{pname}'.")
else:
seen_peer_names[pname] = pidx
if not peer.get("public_key"):
errors.append(f"{plabel}: missing 'public_key' field.")
pip_str = peer.get("ip", "")
if not pip_str:
errors.append(f"{plabel}: missing 'ip' field.")
else:
try:
ipaddress.IPv4Address(gw)
pip = ipaddress.IPv4Address(pip_str)
if wg_net and pip not in wg_net:
errors.append(f"{plabel}: ip '{pip_str}' is not within subnet {wg_net}.")
if pip in identity_ips:
errors.append(f"{plabel}: ip '{pip_str}' conflicts with a server_identity.")
if pip_str in seen_peer_ips:
errors.append(
f"{plabel}: duplicate peer ip '{pip_str}' "
f"(also used by peer '{seen_peer_ips[pip_str]}')."
)
else:
seen_peer_ips[pip_str] = pname
except ValueError:
errors.append(f"{label}: vpn_information.gateway '{gw}' is not a valid IPv4 address.")
eo = vpi.get("explicit_overrides", {})
if not isinstance(eo, dict):
errors.append(f"{label}: vpn_information.explicit_overrides must be a plain object.")
else:
dns = eo.get("dns_server", "")
if dns:
try:
ipaddress.IPv4Address(dns)
except ValueError:
errors.append(f"{label}: vpn_information.explicit_overrides.dns_server '{dns}' is not a valid IPv4 address.")
mtu = eo.get("mtu", "")
if mtu:
try:
m = int(mtu)
if not (576 <= m <= 9000):
errors.append(f"{label}: vpn_information.explicit_overrides.mtu {mtu} is out of valid range (576-9000).")
except (ValueError, TypeError):
errors.append(f"{label}: vpn_information.explicit_overrides.mtu '{mtu}' is not a valid integer.")
# WG VLANs have no server_identities or dhcp block -- skip remaining validation
errors.append(f"{plabel}: ip '{pip_str}' is not a valid IPv4 address.")
continue
if not vlan.get("server_identities"):
errors.append(f"{label}: server_identities is empty or missing.")
continue
d = vlan.get("dhcp", {})
required_dhcp = {"subnet", "subnet_mask", "dynamic_pool_start",
"dynamic_pool_end", "lease_time"}
missing = required_dhcp - set(d.keys())
if missing:
errors.append(f"{label}: missing dhcp fields: {missing}")
for field in ("subnet", "subnet_mask"):
if not vlan.get(field):
errors.append(f"{label}: missing required top-level field '{field}'.")
if not vlan.get("subnet") or not vlan.get("subnet_mask"):
continue
if not is_wg(vlan):
try:
network = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
vlan_networks[iface] = network
except ValueError as e:
errors.append(f"{label}: invalid subnet/subnet_mask: {e}")
continue
d = vlan.get("dhcp_information", {})
required_dhcp = {"dynamic_pool_start", "dynamic_pool_end", "lease_time"}
missing = required_dhcp - set(d.keys())
if missing:
errors.append(f"{label}: missing dhcp_information fields: {missing}")
continue
def check_ip(field_label, ip_str, allow_none=False):
if ip_str is None:
if not allow_none:
errors.append(f"{label}: {field_label} is null/missing.")
return None
try:
network = ipaddress.IPv4Network(f"{d['subnet']}/{d['subnet_mask']}", strict=False)
vlan_networks[iface] = network
except ValueError as e:
errors.append(f"{label}: invalid subnet/subnet_mask: {e}")
continue
ip = ipaddress.IPv4Address(ip_str)
except ValueError:
errors.append(f"{label}: {field_label} '{ip_str}' is not a valid IPv4 address.")
return None
if ip not in network:
errors.append(f"{label}: {field_label} '{ip_str}' is not within subnet {network}.")
return ip
def check_ip(field_label, ip_str, allow_none=False):
if ip_str is None:
if not allow_none:
errors.append(f"{label}: {field_label} is null/missing.")
return None
try:
ip = ipaddress.IPv4Address(ip_str)
except ValueError:
errors.append(f"{label}: {field_label} '{ip_str}' is not a valid IPv4 address.")
return None
if ip not in network:
errors.append(f"{label}: {field_label} '{ip_str}' is not within subnet {network}.")
return ip
identity_ips = []
for idx, ident in enumerate(vlan["server_identities"]):
ip = check_ip(
f"server_identities[{idx}] '{ident.get('description', '?')}'",
ident.get("ip")
)
if ip:
identity_ips.append(ip)
identity_ips = []
for idx, ident in enumerate(vlan["server_identities"]):
ip = check_ip(
f"server_identities[{idx}] '{ident.get('description', '?')}'",
ident.get("ip")
)
if ip:
identity_ips.append(ip)
# -- Validate explicit_overrides -----------------------------------
eo = d.get("explicit_overrides", {})
if not isinstance(eo, dict):
errors.append(f"{label}: explicit_overrides must be a plain object.")
else:
gw = eo.get("gateway", "")
if gw:
gw_ip = check_ip("explicit_overrides.gateway", gw)
if gw_ip and gw_ip not in identity_ips:
errors.append(
f"{label}: explicit_overrides.gateway '{gw}' does not match "
f"any server_identity IP. Must be one of: "
f"{[str(ip) for ip in identity_ips]}."
)
dns = eo.get("dns_server", "")
if dns:
check_ip("explicit_overrides.dns_server", dns)
ntp = eo.get("ntp_server", "")
if ntp:
check_ip("explicit_overrides.ntp_server", ntp)
# -- Validate explicit_overrides -----------------------------------
eo = d.get("explicit_overrides", {})
if not isinstance(eo, dict):
errors.append(f"{label}: explicit_overrides must be a plain object.")
pool_start = check_ip("dynamic_pool_start", d["dynamic_pool_start"])
pool_end = check_ip("dynamic_pool_end", d["dynamic_pool_end"])
if pool_start and pool_end and pool_start > pool_end:
errors.append(
f"{label}: dynamic_pool_start '{pool_start}' is greater than "
f"dynamic_pool_end '{pool_end}'."
)
if pool_start and pool_end:
for ip in identity_ips:
if pool_start <= ip <= pool_end:
errors.append(
f"{label}: server_identity '{ip}' falls inside the dynamic "
f"pool ({pool_start} - {pool_end})."
)
seen_res_ips = {}
seen_res_macs = {}
for r in vlan.get("reservations", []):
rdesc = r.get("description", "?")
rmac = r.get("mac", "").lower().strip()
if is_dynamic_ip(r):
rip = None # no pinned IP -- skip all IP validation
else:
gw = eo.get("gateway", "")
if gw:
gw_ip = check_ip("explicit_overrides.gateway", gw)
if gw_ip and gw_ip not in identity_ips:
rip = check_ip(f"reservation '{rdesc}' ip", r.get("ip"))
if rip:
if pool_start and pool_end and pool_start <= rip <= pool_end:
errors.append(
f"{label}: reservation '{rdesc}' ip '{rip}' falls inside "
f"the dynamic pool ({pool_start} - {pool_end})."
)
rip_str = str(rip)
if rip_str in seen_res_ips:
# Allow same IP for different MACs (multi-interface device)
# Only flag if same MAC is also duplicated (caught below)
if rmac and rmac in seen_res_ips[rip_str]:
errors.append(
f"{label}: explicit_overrides.gateway '{gw}' does not match "
f"any server_identity IP. Must be one of: "
f"{[str(ip) for ip in identity_ips]}."
f"{label}: reservation '{rdesc}' ip '{rip}' and MAC '{rmac}' "
f"duplicates '{seen_res_ips[rip_str][rmac]}'."
)
dns = eo.get("dns_server", "")
if dns:
check_ip("explicit_overrides.dns_server", dns)
ntp = eo.get("ntp_server", "")
if ntp:
check_ip("explicit_overrides.ntp_server", ntp)
pool_start = check_ip("dynamic_pool_start", d["dynamic_pool_start"])
pool_end = check_ip("dynamic_pool_end", d["dynamic_pool_end"])
if pool_start and pool_end and pool_start > pool_end:
errors.append(
f"{label}: dynamic_pool_start '{pool_start}' is greater than "
f"dynamic_pool_end '{pool_end}'."
)
if pool_start and pool_end:
for ip in identity_ips:
if pool_start <= ip <= pool_end:
errors.append(
f"{label}: server_identity '{ip}' falls inside the dynamic "
f"pool ({pool_start} - {pool_end})."
)
seen_res_ips = {}
seen_res_macs = {}
for r in vlan.get("reservations", []):
rdesc = r.get("description", "?")
rmac = r.get("mac", "").lower().strip()
if is_dynamic_ip(r):
rip = None # no pinned IP -- skip all IP validation
else:
seen_res_ips[rip_str][rmac] = rdesc
else:
rip = check_ip(f"reservation '{rdesc}' ip", r.get("ip"))
seen_res_ips[rip_str] = {rmac: rdesc}
if rip in identity_ips:
errors.append(
f"{label}: reservation '{rdesc}' ip '{rip}' conflicts "
f"with a server_identity."
)
if rip:
if pool_start and pool_end and pool_start <= rip <= pool_end:
errors.append(
f"{label}: reservation '{rdesc}' ip '{rip}' falls inside "
f"the dynamic pool ({pool_start} - {pool_end})."
)
rip_str = str(rip)
if rip_str in seen_res_ips:
# Allow same IP for different MACs (multi-interface device)
# Only flag if same MAC is also duplicated (caught below)
if rmac and rmac in seen_res_ips[rip_str]:
errors.append(
f"{label}: reservation '{rdesc}' ip '{rip}' and MAC '{rmac}' "
f"duplicates '{seen_res_ips[rip_str][rmac]}'."
)
else:
seen_res_ips[rip_str][rmac] = rdesc
else:
seen_res_ips[rip_str] = {rmac: rdesc}
if rip in identity_ips:
errors.append(
f"{label}: reservation '{rdesc}' ip '{rip}' conflicts "
f"with a server_identity."
)
if rmac:
if rmac in seen_res_macs:
errors.append(
f"{label}: reservation '{rdesc}' MAC '{rmac}' duplicates "
f"'{seen_res_macs[rmac]}'."
)
else:
seen_res_macs[rmac] = rdesc
if rmac:
if rmac in seen_res_macs:
errors.append(
f"{label}: reservation '{rdesc}' MAC '{rmac}' duplicates "
f"'{seen_res_macs[rmac]}'."
)
else:
seen_res_macs[rmac] = rdesc
for bl_name in vlan.get("use_blocklists", []):
if bl_name not in blocklists_by_name:
@ -631,6 +743,25 @@ def validate_config(data):
errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). "
f"Only one VLAN may be the RADIUS default.")
# -- host_overrides validation ---------------------------------------------
all_vlan_nets = list(vlan_networks.values())
for idx, entry in enumerate(data.get("host_overrides", [])):
lbl = f"host_overrides[{idx}] '{entry.get('host', '?')}'"
if not entry.get("host"):
errors.append(f"{lbl}: missing 'host' field.")
ip_str = entry.get("ip", "")
if not ip_str:
errors.append(f"{lbl}: missing 'ip' field.")
else:
try:
ip_addr = ipaddress.IPv4Address(ip_str)
if all_vlan_nets and not any(ip_addr in net for net in all_vlan_nets):
errors.append(
f"{lbl}: '{ip_str}' does not fall within any configured VLAN subnet."
)
except ValueError:
errors.append(f"{lbl}: '{ip_str}' is not a valid IPv4 address.")
# -- banned_ips validation -------------------------------------------------
for idx, entry in enumerate(data.get("banned_ips", [])):
ip = entry.get("ip", "")
@ -947,7 +1078,7 @@ def build_vlan_dnsmasq_conf(vlan, data):
overrides = [o for o in data.get("host_overrides", []) if o.get("enabled") is True]
name = vlan["name"]
iface = vlan["interface"]
d = vlan.get("dhcp", {})
d = vlan.get("dhcp_information", {})
opts = resolve_vlan_options(vlan)
gateway = opts["gateway"]
@ -982,7 +1113,8 @@ def build_vlan_dnsmasq_conf(vlan, data):
if not is_wg(vlan):
line("# -- DHCP -----------------------------------------------------------")
line(f"dhcp-range=set:{name},{d['dynamic_pool_start']},{d['dynamic_pool_end']},{d['subnet_mask']},{d['lease_time']}")
dotted_mask = prefix_to_dotted(vlan['subnet_mask'])
line(f"dhcp-range=set:{name},{d['dynamic_pool_start']},{d['dynamic_pool_end']},{dotted_mask},{d['lease_time']}")
line(f"domain={d.get('domain', 'local')}")
line()
line(f"dhcp-option=tag:{name},option:router,{gateway}")
@ -1207,13 +1339,7 @@ def ensure_chrony(data):
content = chrony_conf.read_text()
subnets = []
for v in data["vlans"]:
if is_wg(v):
# Derive subnet from gateway IP -- always a /24
gw = v["vpn_information"]["gateway"]
net = ipaddress.IPv4Network(f"{gw}/24", strict=False)
subnets.append(str(net))
else:
subnets.append(str(network_for(v)))
subnets.append(str(network_for(v)))
added = []
for subnet in subnets:
line = f"allow {subnet}"
@ -1297,6 +1423,123 @@ def wg_interface_up(iface):
capture_output=True, text=True)
return result.returncode == 0
def wg_server_key_path(iface):
return WG_DIR / f"{iface}.key"
def wg_server_pubkey_path(iface):
"""Public key written to the configs dir so the Flask app can read it."""
return SCRIPT_DIR / f".wg-{iface}.pub"
def wg_conf_path_for(iface):
return WG_DIR / f"{iface}.conf"
def generate_wg_server_key(iface):
WG_DIR.mkdir(exist_ok=True)
result = subprocess.run(["wg", "genkey"], capture_output=True, text=True, check=True)
private = result.stdout.strip()
kf = wg_server_key_path(iface)
kf.write_text(private + "\n")
kf.chmod(0o600)
return private
def build_wg_server_conf(vlan, server_private_key):
"""Build the /etc/wireguard/<iface>.conf content from core.json peers."""
iface = vlan["interface"]
info = vlan["vpn_information"]
gateway = resolve_vlan_options(vlan)["gateway"]
network = network_for(vlan)
server_ip = f"{gateway}/{network.prefixlen}"
listen_port = info["listen_port"]
domain = info.get("domain", "local")
L = [
"# Generated by core.py -- do not edit manually.",
"# Run: sudo python3 core.py --apply",
"",
"[Interface]",
f"PrivateKey = {server_private_key}",
f"Address = {server_ip}",
f"ListenPort = {listen_port}",
"",
]
for peer in vlan.get("peers", []):
if not peer.get("enabled", True):
L += [f"# DISABLED: {peer['name']}", ""]
continue
L += [
f"# {peer['name']}",
"[Peer]",
f"PublicKey = {peer['public_key']}",
f"AllowedIPs = {peer['ip']}/32",
f"PersistentKeepalive = {WG_KEEPALIVE}",
"",
]
return "\n".join(L)
def ensure_wg_interfaces(data):
"""Generate WireGuard server confs and bring up / sync all WG interfaces."""
wg_vlans = [v for v in data.get("vlans", []) if is_wg(v)]
if not wg_vlans:
return
for vlan in wg_vlans:
iface = vlan["interface"]
print(f" [{iface}]")
kf = wg_server_key_path(iface)
if not kf.exists():
print(f" Generating server private key...")
private = generate_wg_server_key(iface)
else:
private = kf.read_text().strip()
pub_result = subprocess.run(
["wg", "pubkey"], input=private, capture_output=True, text=True, check=True
)
public = pub_result.stdout.strip()
pubkey_file = wg_server_pubkey_path(iface)
pubkey_file.write_text(public + "\n")
chown_to_script_dir_owner(pubkey_file)
print(f" Server public key: {public[:20]}...")
WG_DIR.mkdir(exist_ok=True)
conf_file = wg_conf_path_for(iface)
new_conf = build_wg_server_conf(vlan, private)
listen_port = vlan["vpn_information"]["listen_port"]
port_changed = False
if conf_file.exists():
m = re.search(r'ListenPort\s*=\s*(\d+)', conf_file.read_text())
if m and int(m.group(1)) != listen_port:
port_changed = True
conf_file.write_text(new_conf)
conf_file.chmod(0o600)
peer_count = len([p for p in vlan.get("peers", []) if p.get("enabled", True)])
print(f" Wrote {conf_file} ({peer_count} enabled peer(s))")
if not wg_interface_up(iface):
print(f" Bringing up {iface}...")
r = subprocess.run(["wg-quick", "up", iface], capture_output=True, text=True)
if r.returncode != 0:
print(f" WARNING: wg-quick up {iface} failed: {r.stderr.strip()}")
else:
print(f" {iface} is up.")
elif port_changed:
print(f" Listen port changed -- restarting {iface}...")
subprocess.run(["wg-quick", "down", iface], capture_output=True, text=True)
r = subprocess.run(["wg-quick", "up", iface], capture_output=True, text=True)
if r.returncode != 0:
print(f" WARNING: wg-quick up {iface} failed: {r.stderr.strip()}")
else:
print(f" {iface} restarted.")
else:
print(f" Syncing peers to live {iface}...")
subprocess.run(["wg", "syncconf", iface, str(conf_file)], capture_output=True, text=True)
def get_container_bridges():
"""Return all active bridge interfaces not managed by our VLAN config.
Works universally for Docker, Podman, LXC, libvirt, etc. -- anything
@ -1358,9 +1601,7 @@ def apply_dnsmasq_instances(data, dry_run=False, start_if_needed=True):
for vlan in data["vlans"]:
if is_wg(vlan) and not dry_run and not wg_interface_up(vlan["interface"]):
print(f"Skipped VLAN '{vlan['name']}': {vlan['interface']} is not up (WireGuard not running).")
print(" To enable the VPN VLAN, start WireGuard with vpn.py --apply")
print(" (core.py --apply will be called again automatically).")
print(f"Skipped VLAN '{vlan['name']}': {vlan['interface']} is not up. Run --apply again after WireGuard is up.")
continue
conf_content = build_vlan_dnsmasq_conf(vlan, data)
@ -1690,13 +1931,11 @@ def build_nft_config(data, dry_run=False):
# Build interface -> network map for nat_ip -> iface lookup in forward chain
vlan_networks = {}
for v in vlans:
if not is_wg(v):
d = v.get("dhcp", {})
try:
net = ipaddress.IPv4Network(f"{d['subnet']}/{d['subnet_mask']}", strict=False)
vlan_networks[v["interface"]] = net
except (KeyError, ValueError):
pass
try:
net = network_for(v)
vlan_networks[v["interface"]] = net
except (KeyError, ValueError):
pass
all_except = rule_enabled(data.get("inter_vlan_exceptions", []))
banned_v4, banned_v6 = banned_ip_sets(data)
@ -2000,16 +2239,14 @@ def apply_nftables(data, dry_run=False):
print("nftables rules applied successfully.")
# Build set of active subnets for filtering exception display
import ipaddress as _ipaddress
active_subnets = []
for v in data["vlans"]:
if is_wg(v):
if wg_interface_up(v["interface"]):
gw = v["vpn_information"]["gateway"]
active_subnets.append(_ipaddress.IPv4Network(f"{gw}/24", strict=False))
else:
d = v["dhcp"]
active_subnets.append(_ipaddress.IPv4Network(f"{d['subnet']}/{d['subnet_mask']}", strict=False))
if is_wg(v) and not wg_interface_up(v["interface"]):
continue
try:
active_subnets.append(network_for(v))
except (KeyError, ValueError):
pass
def dst_is_active(r):
dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "")
@ -2894,14 +3131,7 @@ def _dry_run_conflicting_services(data):
chrony_conf = Path("/etc/chrony/chrony.conf")
if chrony_conf.exists():
content = chrony_conf.read_text()
subnets = []
for v in data["vlans"]:
if is_wg(v):
gw = v["vpn_information"]["gateway"]
net = ipaddress.IPv4Network(f"{gw}/24", strict=False)
subnets.append(str(net))
else:
subnets.append(str(network_for(v)))
subnets = [str(network_for(v)) for v in data["vlans"]]
missing = [s for s in subnets if f"allow {s}" not in content]
if missing:
print(f" Would add chrony allow directives for: {', '.join(missing)}")
@ -3261,6 +3491,7 @@ def cmd_apply(data, dry_run=False):
dnsmasq confs, start/restart all services whose interface is up, nftables,
timer, and boot service. Safe to run repeatedly.
"""
inject_interfaces(data)
if dry_run:
print("[DRY RUN] --apply would perform the following actions:")
print()
@ -3307,14 +3538,16 @@ def cmd_apply(data, dry_run=False):
total_enabled = sum(
len([r for r in v.get("reservations", []) if r.get("enabled") is True])
for v in data["vlans"]
for v in data["vlans"] if not is_wg(v)
)
total_disabled = sum(
len([r for r in v.get("reservations", []) if r.get("enabled") is not True])
for v in data["vlans"]
for v in data["vlans"] if not is_wg(v)
)
total_wg_peers = sum(len(v.get("peers", [])) for v in data["vlans"] if is_wg(v))
wg_part = f", {total_wg_peers} WG peer(s)" if total_wg_peers else ""
print(f"Applying config: {len(data['vlans'])} VLAN(s), "
f"{total_enabled} reservation(s), {total_disabled} skipped.")
f"{total_enabled} reservation(s), {total_disabled} skipped{wg_part}.")
print()
print("-- Conflicting services ----------------------------------------------")
@ -3327,6 +3560,11 @@ def cmd_apply(data, dry_run=False):
apply_networkd(data, only_if_changed=True)
print()
if any(is_wg(v) for v in data["vlans"]):
print("-- WireGuard interfaces ----------------------------------------------")
ensure_wg_interfaces(data)
print()
print("-- dnsmasq instances -------------------------------------------------")
if not blocklists_available(data):
print(" NOTE: No merged blocklist files found -- blocklist rules will be absent.")