Development

This commit is contained in:
Matthew Grotke 2026-05-21 09:35:03 -04:00
parent fb76f893e9
commit efbd21cb59
2 changed files with 84 additions and 92 deletions

View file

@ -103,7 +103,7 @@ from validation import (
VALID_PROTOCOLS, VALID_BLOCKLIST_FORMATS, VALID_PROTOCOLS, VALID_BLOCKLIST_FORMATS,
int_range, domainname, int_range, domainname,
is_wg, is_dynamic_ip, is_wg, is_dynamic_ip,
resolve_vlan_derived_fields, validate_config, derive_vlan_id, derive_interface, validate_config,
) )
PRODUCT_NAME = "routlin" PRODUCT_NAME = "routlin"
@ -251,18 +251,18 @@ def resolve_vlan_options(vlan):
} }
def is_physical(vlan): def is_physical(vlan):
return vlan["vlan_id"] == 1 return derive_vlan_id(vlan.get("subnet", ""), vlan.get("subnet_mask", 24)) == 1
def networkd_stem(vlan): def networkd_stem(vlan):
return f"10-{PRODUCT_NAME}-{vlan['name']}" return f"10-{PRODUCT_NAME}-{vlan['name']}"
def vlan_service_name(vlan): def vlan_service_name(vlan, iface):
if is_wg(vlan): if is_wg(vlan):
return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}-{vlan['interface']}" return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}-{iface}"
return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}" return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}"
def vlan_service_file(vlan): def vlan_service_file(vlan, iface):
return SYSTEMD_DIR / f"{vlan_service_name(vlan)}.service" return SYSTEMD_DIR / f"{vlan_service_name(vlan, iface)}.service"
def vlan_conf_file(vlan): def vlan_conf_file(vlan):
return DNSMASQ_CONF_DIR / f"{vlan['name']}.conf" return DNSMASQ_CONF_DIR / f"{vlan['name']}.conf"
@ -308,21 +308,21 @@ def load_config():
# Build systemd-networkd files # Build systemd-networkd files
# =================================================================== # ===================================================================
def build_netdev(vlan): def build_netdev(vlan, vid, iface):
return "\n".join([ return "\n".join([
"# Generated by core.py -- do not edit manually.", "# Generated by core.py -- do not edit manually.",
"# Edit core.json and re-run: sudo python3 core.py --apply", "# Edit core.json and re-run: sudo python3 core.py --apply",
"", "",
"[NetDev]", "[NetDev]",
f"Name={vlan['interface']}", f"Name={iface}",
"Kind=vlan", "Kind=vlan",
"", "",
"[VLAN]", "[VLAN]",
f"Id={vlan['vlan_id']}", f"Id={vid}",
"", "",
]) ])
def build_network(vlan, all_vlan_ids): def build_network(vlan, vid, iface, all_vlan_ids):
network = network_for(vlan) network = network_for(vlan)
prefix = network.prefixlen prefix = network.prefixlen
lines = [ lines = [
@ -330,7 +330,7 @@ def build_network(vlan, all_vlan_ids):
"# Edit core.json and re-run: sudo python3 core.py --apply", "# Edit core.json and re-run: sudo python3 core.py --apply",
"", "",
"[Match]", "[Match]",
f"Name={vlan['interface']}", f"Name={iface}",
"", "",
"[Network]", "[Network]",
"DHCP=no", "DHCP=no",
@ -342,9 +342,9 @@ def build_network(vlan, all_vlan_ids):
if is_physical(vlan): if is_physical(vlan):
lines.append("") lines.append("")
for vid in all_vlan_ids: for v in all_vlan_ids:
if vid != 1: if v != 1:
lines.append(f"VLAN={vlan['interface']}.{vid}") lines.append(f"VLAN={iface}.{v}")
lines.append("") lines.append("")
return "\n".join(lines) return "\n".join(lines)
@ -370,8 +370,8 @@ def apply_networkd(data, dry_run=False, only_if_changed=False):
If only_if_changed=True, write files only when content differs from disk If only_if_changed=True, write files only when content differs from disk
and skip the networkd reload if nothing changed. Used by --apply mode. and skip the networkd reload if nothing changed. Used by --apply mode.
""" """
all_vlan_ids = [v["vlan_id"] for v in data["vlans"] if not is_wg(v)] all_vlan_ids = [derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) for v in data["vlans"] if not is_wg(v)]
managed_ifaces = [v["interface"] for v in data["vlans"]] managed_ifaces = [derive_interface(v, data) for v in data["vlans"]]
changed = False changed = False
legacy = find_legacy_files(managed_ifaces) legacy = find_legacy_files(managed_ifaces)
@ -387,11 +387,13 @@ def apply_networkd(data, dry_run=False, only_if_changed=False):
for vlan in data["vlans"]: for vlan in data["vlans"]:
if is_wg(vlan): if is_wg(vlan):
continue continue
iface = derive_interface(vlan, data)
vid = derive_vlan_id(vlan.get('subnet', ''), vlan.get('subnet_mask', 24))
stem = networkd_stem(vlan) stem = networkd_stem(vlan)
if not is_physical(vlan): if not is_physical(vlan):
netdev_path = NETWORKD_DIR / f"{stem}.netdev" netdev_path = NETWORKD_DIR / f"{stem}.netdev"
netdev_content = build_netdev(vlan) netdev_content = build_netdev(vlan, vid, iface)
if dry_run: if dry_run:
print(f"# -- {netdev_path} (dry-run) --") print(f"# -- {netdev_path} (dry-run) --")
print(netdev_content) print(netdev_content)
@ -405,7 +407,7 @@ def apply_networkd(data, dry_run=False, only_if_changed=False):
print(f"Unchanged: {netdev_path}") print(f"Unchanged: {netdev_path}")
network_path = NETWORKD_DIR / f"{stem}.network" network_path = NETWORKD_DIR / f"{stem}.network"
network_content = build_network(vlan, all_vlan_ids) network_content = build_network(vlan, vid, iface, all_vlan_ids)
if dry_run: if dry_run:
print(f"# -- {network_path} (dry-run) --") print(f"# -- {network_path} (dry-run) --")
print(network_content) print(network_content)
@ -595,13 +597,12 @@ def _wan_has_ipv6(iface):
return False return False
def build_vlan_dnsmasq_conf(vlan, data): def build_vlan_dnsmasq_conf(vlan, data, iface):
"""Generate the complete dnsmasq config for one VLAN instance.""" """Generate the complete dnsmasq config for one VLAN instance."""
dns_cfg = data.get("upstream_dns", {}) dns_cfg = data.get("upstream_dns", {})
general = data.get("general", {}) general = data.get("general", {})
overrides = [o for o in data.get("host_overrides", []) if o.get("enabled") is True] overrides = [o for o in data.get("host_overrides", []) if o.get("enabled") is True]
name = vlan["name"] name = vlan["name"]
iface = vlan["interface"]
d = vlan.get("dhcp_information", {}) d = vlan.get("dhcp_information", {})
opts = resolve_vlan_options(vlan) opts = resolve_vlan_options(vlan)
gateway = opts["gateway"] gateway = opts["gateway"]
@ -619,7 +620,7 @@ def build_vlan_dnsmasq_conf(vlan, data):
line("# Generated by core.py -- do not edit manually.") line("# Generated by core.py -- do not edit manually.")
line("# Edit core.json and re-run: sudo python3 core.py --apply") line("# Edit core.json and re-run: sudo python3 core.py --apply")
line(f"# VLAN: {name} (vlan_id={vlan['vlan_id']})") line(f"# VLAN: {name} (vlan_id={derive_vlan_id(vlan.get('subnet', ''), vlan.get('subnet_mask', 24))})")
line() line()
line(f"pid-file={vlan_pid_file(vlan)}") line(f"pid-file={vlan_pid_file(vlan)}")
if not is_wg(vlan): if not is_wg(vlan):
@ -737,9 +738,8 @@ def build_vlan_dnsmasq_conf(vlan, data):
# Build per-VLAN systemd service unit # Build per-VLAN systemd service unit
# =================================================================== # ===================================================================
def build_vlan_service(vlan): def build_vlan_service(vlan, iface):
name = vlan["name"] name = vlan["name"]
iface = vlan["interface"]
conf = vlan_conf_file(vlan) conf = vlan_conf_file(vlan)
if is_wg(vlan): if is_wg(vlan):
@ -938,9 +938,8 @@ def generate_wg_server_key(iface):
kf.chmod(0o600) kf.chmod(0o600)
return private return private
def build_wg_server_conf(vlan, server_private_key): def build_wg_server_conf(vlan, server_private_key, iface):
"""Build the /etc/wireguard/<iface>.conf content from core.json peers.""" """Build the /etc/wireguard/<iface>.conf content from core.json peers."""
iface = vlan["interface"]
info = vlan["vpn_information"] info = vlan["vpn_information"]
gateway = resolve_vlan_options(vlan)["gateway"] gateway = resolve_vlan_options(vlan)["gateway"]
network = network_for(vlan) network = network_for(vlan)
@ -981,7 +980,7 @@ def ensure_wg_interfaces(data):
return return
for vlan in wg_vlans: for vlan in wg_vlans:
iface = vlan["interface"] iface = derive_interface(vlan, data)
print(f" [{iface}]") print(f" [{iface}]")
kf = wg_server_key_path(iface) kf = wg_server_key_path(iface)
@ -1002,7 +1001,7 @@ def ensure_wg_interfaces(data):
WG_DIR.mkdir(exist_ok=True) WG_DIR.mkdir(exist_ok=True)
conf_file = wg_conf_path_for(iface) conf_file = wg_conf_path_for(iface)
new_conf = build_wg_server_conf(vlan, private) new_conf = build_wg_server_conf(vlan, private, iface)
listen_port = vlan["vpn_information"]["listen_port"] listen_port = vlan["vpn_information"]["listen_port"]
port_changed = False port_changed = False
@ -1088,7 +1087,7 @@ def apply_dnsmasq_instances(data, dry_run=False, start_if_needed=True):
start_if_needed=False (--apply): only restart instances already running; start_if_needed=False (--apply): only restart instances already running;
skip with a warning if not running. skip with a warning if not running.
""" """
active_service_stems = {vlan_service_name(vlan) for vlan in data["vlans"]} active_service_stems = {vlan_service_name(vlan, derive_interface(vlan, data)) for vlan in data["vlans"]}
if not dry_run: if not dry_run:
DNSMASQ_CONF_DIR.mkdir(exist_ok=True) DNSMASQ_CONF_DIR.mkdir(exist_ok=True)
@ -1096,14 +1095,15 @@ def apply_dnsmasq_instances(data, dry_run=False, start_if_needed=True):
print() print()
for vlan in data["vlans"]: for vlan in data["vlans"]:
if is_wg(vlan) and not dry_run and not wg_interface_up(vlan["interface"]): iface = derive_interface(vlan, data)
print(f"Skipped VLAN '{vlan['name']}': {vlan['interface']} is not up. Run --apply again after WireGuard is up.") if is_wg(vlan) and not dry_run and not wg_interface_up(iface):
print(f"Skipped VLAN '{vlan['name']}': {iface} is not up. Run --apply again after WireGuard is up.")
continue continue
conf_content = build_vlan_dnsmasq_conf(vlan, data) conf_content = build_vlan_dnsmasq_conf(vlan, data, iface)
svc_content = build_vlan_service(vlan) svc_content = build_vlan_service(vlan, iface)
conf_path = vlan_conf_file(vlan) conf_path = vlan_conf_file(vlan)
svc_path = vlan_service_file(vlan) svc_path = vlan_service_file(vlan, iface)
if dry_run: if dry_run:
print(f"# -- {conf_path} (dry-run) --") print(f"# -- {conf_path} (dry-run) --")
@ -1140,9 +1140,10 @@ def apply_dnsmasq_instances(data, dry_run=False, start_if_needed=True):
if start_if_needed: if start_if_needed:
print("Starting dnsmasq instances...") print("Starting dnsmasq instances...")
for vlan in data["vlans"]: for vlan in data["vlans"]:
if is_wg(vlan) and not wg_interface_up(vlan["interface"]): iface = derive_interface(vlan, data)
if is_wg(vlan) and not wg_interface_up(iface):
continue continue
svc = vlan_service_name(vlan) svc = vlan_service_name(vlan, iface)
subprocess.run(["systemctl", "enable", svc], capture_output=True, text=True) subprocess.run(["systemctl", "enable", svc], capture_output=True, text=True)
result = subprocess.run(["systemctl", "restart", svc], result = subprocess.run(["systemctl", "restart", svc],
capture_output=True, text=True) capture_output=True, text=True)
@ -1153,9 +1154,10 @@ def apply_dnsmasq_instances(data, dry_run=False, start_if_needed=True):
else: else:
print("Reloading dnsmasq instances...") print("Reloading dnsmasq instances...")
for vlan in data["vlans"]: for vlan in data["vlans"]:
if is_wg(vlan) and not wg_interface_up(vlan["interface"]): iface = derive_interface(vlan, data)
if is_wg(vlan) and not wg_interface_up(iface):
continue continue
svc = vlan_service_name(vlan) svc = vlan_service_name(vlan, iface)
state = subprocess.run( state = subprocess.run(
["systemctl", "is-active", svc], ["systemctl", "is-active", svc],
capture_output=True, text=True capture_output=True, text=True
@ -1475,18 +1477,18 @@ def build_nft_config(data, dry_run=False):
# Exclude WG VLANs whose interface is not up -- nft rejects rules that # Exclude WG VLANs whose interface is not up -- nft rejects rules that
# reference non-existent interfaces, which would leave no firewall at all. # reference non-existent interfaces, which would leave no firewall at all.
vlans = [v for v in data["vlans"] vlans = [v for v in data["vlans"]
if not is_wg(v) or dry_run or wg_interface_up(v["interface"])] if not is_wg(v) or dry_run or wg_interface_up(derive_interface(v, data))]
all_fwd = list(rule_enabled(data.get("port_forwarding", []))) all_fwd = list(rule_enabled(data.get("port_forwarding", [])))
all_wrngl = [(v, r) for v in vlans for r in rule_enabled(v.get("port_wrangling", []))] all_wrngl = [(v, r) for v in vlans for r in rule_enabled(v.get("port_wrangling", []))]
# Interfaces that are active (WG interfaces only included if up) # Interfaces that are active (WG interfaces only included if up)
active_ifaces = {v["interface"] for v in vlans} active_ifaces = {derive_interface(v, data) for v in vlans}
# Build interface -> network map for nat_ip -> iface lookup in forward chain # Build interface -> network map for nat_ip -> iface lookup in forward chain
vlan_networks = {} vlan_networks = {}
for v in vlans: for v in vlans:
try: try:
net = network_for(v) net = network_for(v)
vlan_networks[v["interface"]] = net vlan_networks[derive_interface(v, data)] = net
except (KeyError, ValueError): except (KeyError, ValueError):
pass pass
@ -1525,7 +1527,7 @@ def build_nft_config(data, dry_run=False):
line(" # -- Port wrangling (redirect VLAN traffic to local host) ----") line(" # -- Port wrangling (redirect VLAN traffic to local host) ----")
line() line()
for vlan, rule in all_wrngl: for vlan, rule in all_wrngl:
iface = vlan["interface"] iface = derive_interface(vlan, data)
for proto, r, suffix in expand_protocols(rule): for proto, r, suffix in expand_protocols(rule):
line(f" # {r['description']}{suffix}") line(f" # {r['description']}{suffix}")
line(f" iif \"{iface}\" {proto} dport {r['dest_port']} ip daddr != {r['redirect_to']} dnat to {r['redirect_to']}") line(f" iif \"{iface}\" {proto} dport {r['dest_port']} ip daddr != {r['redirect_to']} dnat to {r['redirect_to']}")
@ -1608,7 +1610,7 @@ def build_nft_config(data, dry_run=False):
line(" # Allow all traffic inbound from any VLAN interface") line(" # Allow all traffic inbound from any VLAN interface")
for vlan in vlans: for vlan in vlans:
line(f" iif \"{vlan['interface']}\" accept # {vlan['name']}") line(f" iif \"{derive_interface(vlan, data)}\" accept # {vlan['name']}")
line() line()
if all_fwd: if all_fwd:
@ -1639,14 +1641,14 @@ def build_nft_config(data, dry_run=False):
line(" # Allow each VLAN -> WAN (outbound internet)") line(" # Allow each VLAN -> WAN (outbound internet)")
for vlan in vlans: for vlan in vlans:
line(f" iif \"{vlan['interface']}\" oif \"{wan}\" accept # {vlan['name']} -> WAN") line(f" iif \"{derive_interface(vlan, data)}\" oif \"{wan}\" accept # {vlan['name']} -> WAN")
line() line()
if container_bridges: if container_bridges:
line(" # Allow VLAN -> Docker bridge forwarding") line(" # Allow VLAN -> Docker bridge forwarding")
for vlan in vlans: for vlan in vlans:
for bridge in container_bridges: for bridge in container_bridges:
line(f" iif \"{vlan['interface']}\" oif \"{bridge}\" ct state new accept" line(f" iif \"{derive_interface(vlan, data)}\" oif \"{bridge}\" ct state new accept"
f" # {vlan['name']} -> {bridge}") f" # {vlan['name']} -> {bridge}")
line() line()
@ -1769,9 +1771,9 @@ def apply_nftables(data, dry_run=False):
print(config) print(config)
return return
active_ifaces = {v["interface"] for v in data["vlans"] active_ifaces = {derive_interface(v, data) for v in data["vlans"]
if not is_wg(v) or wg_interface_up(v["interface"])} if not is_wg(v) or wg_interface_up(derive_interface(v, data))}
active_vlans = [v for v in data["vlans"] if v["interface"] in active_ifaces] active_vlans = [v for v in data["vlans"] if derive_interface(v, data) in active_ifaces]
all_fwd = list(rule_enabled(data.get("port_forwarding", []))) all_fwd = list(rule_enabled(data.get("port_forwarding", [])))
all_dis_fwd = list(rule_disabled(data.get("port_forwarding", []))) all_dis_fwd = list(rule_disabled(data.get("port_forwarding", [])))
@ -1794,7 +1796,7 @@ def apply_nftables(data, dry_run=False):
# Build set of active subnets for filtering exception display # Build set of active subnets for filtering exception display
active_subnets = [] active_subnets = []
for v in data["vlans"]: for v in data["vlans"]:
if is_wg(v) and not wg_interface_up(v["interface"]): if is_wg(v) and not wg_interface_up(derive_interface(v, data)):
continue continue
try: try:
active_subnets.append(network_for(v)) active_subnets.append(network_for(v))
@ -1984,7 +1986,7 @@ def build_radius_users(data):
] ]
for vlan in data["vlans"]: for vlan in data["vlans"]:
vlan_id = vlan["vlan_id"] vlan_id = derive_vlan_id(vlan.get('subnet', ''), vlan.get('subnet_mask', 24))
for r in vlan.get("reservations", []): for r in vlan.get("reservations", []):
if r.get("enabled") is not True: if r.get("enabled") is not True:
continue continue
@ -2000,7 +2002,7 @@ def build_radius_users(data):
"", "",
] ]
default_id = default_vlan["vlan_id"] default_id = derive_vlan_id(default_vlan.get('subnet', ''), default_vlan.get('subnet_mask', 24))
lines += [ lines += [
f"# Default -- unknown MACs land on VLAN {default_id} ({default_vlan['name']})", f"# Default -- unknown MACs land on VLAN {default_id} ({default_vlan['name']})",
"DEFAULT Auth-Type := Accept", "DEFAULT Auth-Type := Accept",
@ -2067,7 +2069,7 @@ def avahi_enabled(data):
def avahi_interfaces(data): def avahi_interfaces(data):
"""Return list of interface names for VLANs with mdns_reflection enabled.""" """Return list of interface names for VLANs with mdns_reflection enabled."""
return [v["interface"] for v in data.get("vlans", []) if v.get("mdns_reflection") is True and not is_wg(v)] return [derive_interface(v, data) for v in data.get("vlans", []) if v.get("mdns_reflection") is True and not is_wg(v)]
def build_avahi_conf(data): def build_avahi_conf(data):
"""Patch avahi-daemon.conf directives needed for cross-VLAN mDNS reflection. """Patch avahi-daemon.conf directives needed for cross-VLAN mDNS reflection.
@ -2185,10 +2187,11 @@ def show_status(data):
units = [] units = []
for vlan in data["vlans"]: for vlan in data["vlans"]:
if is_wg(vlan) and not wg_interface_up(vlan["interface"]): iface = derive_interface(vlan, data)
units.append((vlan_service_name(vlan), "(wg0 not up)", "active")) if is_wg(vlan) and not wg_interface_up(iface):
units.append((vlan_service_name(vlan, iface), "(wg0 not up)", "active"))
else: else:
units.append((vlan_service_name(vlan), None, "active")) units.append((vlan_service_name(vlan, iface), None, "active"))
units.append((f"{BLIST_TIMER_NAME}.timer", None, "active")) units.append((f"{BLIST_TIMER_NAME}.timer", None, "active"))
units.append((NAT_SERVICE_NAME, None, "inactive")) # oneshot - exits after running units.append((NAT_SERVICE_NAME, None, "inactive")) # oneshot - exits after running
units.append(("freeradius", None, "active")) units.append(("freeradius", None, "active"))
@ -2251,7 +2254,7 @@ def reset_leases(data, vlan_name=None):
# Stop # Stop
for vlan in vlans: for vlan in vlans:
svc = vlan_service_name(vlan) svc = vlan_service_name(vlan, derive_interface(vlan, data))
result = subprocess.run(["systemctl", "stop", svc], result = subprocess.run(["systemctl", "stop", svc],
capture_output=True, text=True) capture_output=True, text=True)
if result.returncode == 0: if result.returncode == 0:
@ -2272,7 +2275,7 @@ def reset_leases(data, vlan_name=None):
# Restart # Restart
print() print()
for vlan in vlans: for vlan in vlans:
svc = vlan_service_name(vlan) svc = vlan_service_name(vlan, derive_interface(vlan, data))
result = subprocess.run(["systemctl", "start", svc], result = subprocess.run(["systemctl", "start", svc],
capture_output=True, text=True) capture_output=True, text=True)
if result.returncode == 0: if result.returncode == 0:
@ -2371,7 +2374,7 @@ def collect_metrics(data):
any_running = False any_running = False
for vlan in data["vlans"]: for vlan in data["vlans"]:
svc = vlan_service_name(vlan) svc = vlan_service_name(vlan, derive_interface(vlan, data))
result = subprocess.run( result = subprocess.run(
["systemctl", "kill", "--signal=SIGUSR1", svc], ["systemctl", "kill", "--signal=SIGUSR1", svc],
capture_output=True, text=True capture_output=True, text=True
@ -2388,7 +2391,7 @@ def collect_metrics(data):
server_map = {} server_map = {}
for vlan in data["vlans"]: for vlan in data["vlans"]:
svc = vlan_service_name(vlan) svc = vlan_service_name(vlan, derive_interface(vlan, data))
result = subprocess.run( result = subprocess.run(
["journalctl", "-u", svc, "--since", "5 seconds ago", ["journalctl", "-u", svc, "--since", "5 seconds ago",
"--no-pager", "-o", "cat"], "--no-pager", "-o", "cat"],
@ -2539,7 +2542,7 @@ def stop_instances(data):
remove_dashboard_timer() remove_dashboard_timer()
print() print()
for vlan in data["vlans"]: for vlan in data["vlans"]:
svc = vlan_service_name(vlan) svc = vlan_service_name(vlan, derive_interface(vlan, data))
subprocess.run(["systemctl", "disable", "--now", svc], subprocess.run(["systemctl", "disable", "--now", svc],
capture_output=True, text=True) capture_output=True, text=True)
print(f"Stopped and disabled: {svc}") print(f"Stopped and disabled: {svc}")
@ -2549,7 +2552,7 @@ def disable_all(data):
stop_instances(data) stop_instances(data)
print() print()
for vlan in data["vlans"]: for vlan in data["vlans"]:
for f in (vlan_conf_file(vlan), vlan_service_file(vlan)): for f in (vlan_conf_file(vlan), vlan_service_file(vlan, derive_interface(vlan, data))):
if f.exists(): if f.exists():
f.unlink() f.unlink()
print(f"Removed: {f}") print(f"Removed: {f}")
@ -2769,9 +2772,10 @@ def _dry_run_disable(data, iface, use_dhcp, static_cidr, resolv_ok, dns_choice,
print(f"-- Stopping {PRODUCT_NAME} services (dry-run) --------------------------------") print(f"-- Stopping {PRODUCT_NAME} services (dry-run) --------------------------------")
print(f" Would disable and stop: {BLIST_TIMER_NAME}.timer") print(f" Would disable and stop: {BLIST_TIMER_NAME}.timer")
for vlan in data["vlans"]: for vlan in data["vlans"]:
svc = vlan_service_name(vlan) iface = derive_interface(vlan, data)
svc = vlan_service_name(vlan, iface)
conf = vlan_conf_file(vlan) conf = vlan_conf_file(vlan)
svc_f = vlan_service_file(vlan) svc_f = vlan_service_file(vlan, iface)
print(f" Would stop and disable: {svc}") print(f" Would stop and disable: {svc}")
if conf.exists(): if conf.exists():
print(f" Would remove: {conf}") print(f" Would remove: {conf}")
@ -2838,7 +2842,6 @@ def _dry_run_disable(data, iface, use_dhcp, static_cidr, resolv_ok, dns_choice,
def cmd_disable(data, dry_run=False): def cmd_disable(data, dry_run=False):
"""Interactive wizard to revert the machine from router to plain network client.""" """Interactive wizard to revert the machine from router to plain network client."""
import readline import readline
data = resolve_vlan_derived_fields(data)
print() print()
print("=" * 70) print("=" * 70)
@ -2875,7 +2878,7 @@ def cmd_disable(data, dry_run=False):
if physical is None: if physical is None:
die("No physical VLAN (vlan_id=1) found in config. Cannot determine interface.") die("No physical VLAN (vlan_id=1) found in config. Cannot determine interface.")
iface = physical["interface"] iface = derive_interface(physical, data)
print(" How should this machine obtain its IP address after reversion?") print(" How should this machine obtain its IP address after reversion?")
print() print()
@ -3039,7 +3042,6 @@ def cmd_apply(data, dry_run=False):
dnsmasq confs, start/restart all services whose interface is up, nftables, dnsmasq confs, start/restart all services whose interface is up, nftables,
timer, and boot service. Safe to run repeatedly. timer, and boot service. Safe to run repeatedly.
""" """
data = resolve_vlan_derived_fields(data)
if dry_run: if dry_run:
print("[DRY RUN] --apply would perform the following actions:") print("[DRY RUN] --apply would perform the following actions:")
print() print()
@ -3071,7 +3073,7 @@ def cmd_apply(data, dry_run=False):
print(f" Would write: {RADIUS_USERS_FILE}") print(f" Would write: {RADIUS_USERS_FILE}")
print(f" {total_macs} MAC reservation(s)") print(f" {total_macs} MAC reservation(s)")
if default_vlan: if default_vlan:
print(f" DEFAULT -> VLAN {default_vlan['vlan_id']} ({default_vlan['name']})") print(f" DEFAULT -> VLAN {derive_vlan_id(default_vlan.get('subnet', ''), default_vlan.get('subnet_mask', 24))} ({default_vlan['name']})")
print(f" Would ensure freeradius is running") print(f" Would ensure freeradius is running")
if avahi_enabled(data): if avahi_enabled(data):
print() print()

View file

@ -267,32 +267,22 @@ def derive_vlan_id(subnet, prefix):
return None return None
def resolve_vlan_derived_fields(data): def derive_interface(vlan, data):
"""Return a deep copy of data with vlan_id and interface computed for every VLAN. """Derive the interface name for a VLAN without mutating data."""
lan = data.get('general', {}).get('lan_interface', 'eth0')
WireGuard VLANs are assigned wg0/wg1/... in ascending vlan_id order for if is_wg(vlan):
deterministic interface naming regardless of JSON list order. wg_vlans = [v for v in data.get('vlans', []) if is_wg(v)]
Does not mutate the input dict. wg_sorted = sorted(
""" wg_vlans,
import copy key=lambda v: (
result = copy.deepcopy(data) derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) is None,
lan = result.get("general", {}).get("lan_interface", "eth0") derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) or 0,
vlans = result.get("vlans", []) )
)
for vlan in vlans: idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0)
vlan["vlan_id"] = derive_vlan_id(vlan.get("subnet", ""), vlan.get("subnet_mask", 24)) return f'wg{idx}'
vid = derive_vlan_id(vlan.get('subnet', ''), vlan.get('subnet_mask', 24))
wg_entries = [(i, v) for i, v in enumerate(vlans) if is_wg(v)] return lan if vid == 1 else f'{lan}.{vid}'
wg_sorted = sorted(wg_entries, key=lambda x: (x[1].get("vlan_id") is None, x[1].get("vlan_id") or 0))
for wg_idx, (_, vlan) in enumerate(wg_sorted):
vlan["interface"] = f"wg{wg_idx}"
for vlan in vlans:
if not is_wg(vlan):
vid = vlan.get("vlan_id", 1)
vlan["interface"] = lan if vid == 1 else f"{lan}.{vid}"
return result
# =================================================================== # ===================================================================