diff --git a/routlin/core.py b/routlin/core.py index 34e1211..adc0fb3 100644 --- a/routlin/core.py +++ b/routlin/core.py @@ -103,7 +103,7 @@ from validation import ( VALID_PROTOCOLS, VALID_BLOCKLIST_FORMATS, int_range, domainname, is_wg, is_dynamic_ip, - resolve_vlan_derived_fields, validate_config, + derive_vlan_id, derive_interface, validate_config, ) PRODUCT_NAME = "routlin" @@ -251,18 +251,18 @@ def resolve_vlan_options(vlan): } def is_physical(vlan): - return vlan["vlan_id"] == 1 + return derive_vlan_id(vlan.get("subnet", ""), vlan.get("subnet_mask", 24)) == 1 def networkd_stem(vlan): return f"10-{PRODUCT_NAME}-{vlan['name']}" -def vlan_service_name(vlan): +def vlan_service_name(vlan, iface): if is_wg(vlan): - return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}-{vlan['interface']}" + return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}-{iface}" return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}" -def vlan_service_file(vlan): - return SYSTEMD_DIR / f"{vlan_service_name(vlan)}.service" +def vlan_service_file(vlan, iface): + return SYSTEMD_DIR / f"{vlan_service_name(vlan, iface)}.service" def vlan_conf_file(vlan): return DNSMASQ_CONF_DIR / f"{vlan['name']}.conf" @@ -308,21 +308,21 @@ def load_config(): # Build systemd-networkd files # =================================================================== -def build_netdev(vlan): +def build_netdev(vlan, vid, iface): return "\n".join([ "# Generated by core.py -- do not edit manually.", "# Edit core.json and re-run: sudo python3 core.py --apply", "", "[NetDev]", - f"Name={vlan['interface']}", + f"Name={iface}", "Kind=vlan", "", "[VLAN]", - f"Id={vlan['vlan_id']}", + f"Id={vid}", "", ]) -def build_network(vlan, all_vlan_ids): +def build_network(vlan, vid, iface, all_vlan_ids): network = network_for(vlan) prefix = network.prefixlen lines = [ @@ -330,7 +330,7 @@ def build_network(vlan, all_vlan_ids): "# Edit core.json and re-run: sudo python3 core.py --apply", "", "[Match]", - f"Name={vlan['interface']}", + f"Name={iface}", "", "[Network]", "DHCP=no", @@ -342,9 +342,9 @@ def build_network(vlan, all_vlan_ids): if is_physical(vlan): lines.append("") - for vid in all_vlan_ids: - if vid != 1: - lines.append(f"VLAN={vlan['interface']}.{vid}") + for v in all_vlan_ids: + if v != 1: + lines.append(f"VLAN={iface}.{v}") lines.append("") return "\n".join(lines) @@ -370,8 +370,8 @@ def apply_networkd(data, dry_run=False, only_if_changed=False): If only_if_changed=True, write files only when content differs from disk and skip the networkd reload if nothing changed. Used by --apply mode. """ - all_vlan_ids = [v["vlan_id"] for v in data["vlans"] if not is_wg(v)] - managed_ifaces = [v["interface"] for v in data["vlans"]] + all_vlan_ids = [derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) for v in data["vlans"] if not is_wg(v)] + managed_ifaces = [derive_interface(v, data) for v in data["vlans"]] changed = False legacy = find_legacy_files(managed_ifaces) @@ -387,11 +387,13 @@ def apply_networkd(data, dry_run=False, only_if_changed=False): for vlan in data["vlans"]: if is_wg(vlan): continue + iface = derive_interface(vlan, data) + vid = derive_vlan_id(vlan.get('subnet', ''), vlan.get('subnet_mask', 24)) stem = networkd_stem(vlan) if not is_physical(vlan): netdev_path = NETWORKD_DIR / f"{stem}.netdev" - netdev_content = build_netdev(vlan) + netdev_content = build_netdev(vlan, vid, iface) if dry_run: print(f"# -- {netdev_path} (dry-run) --") print(netdev_content) @@ -405,7 +407,7 @@ def apply_networkd(data, dry_run=False, only_if_changed=False): print(f"Unchanged: {netdev_path}") network_path = NETWORKD_DIR / f"{stem}.network" - network_content = build_network(vlan, all_vlan_ids) + network_content = build_network(vlan, vid, iface, all_vlan_ids) if dry_run: print(f"# -- {network_path} (dry-run) --") print(network_content) @@ -595,13 +597,12 @@ def _wan_has_ipv6(iface): return False -def build_vlan_dnsmasq_conf(vlan, data): +def build_vlan_dnsmasq_conf(vlan, data, iface): """Generate the complete dnsmasq config for one VLAN instance.""" dns_cfg = data.get("upstream_dns", {}) general = data.get("general", {}) overrides = [o for o in data.get("host_overrides", []) if o.get("enabled") is True] name = vlan["name"] - iface = vlan["interface"] d = vlan.get("dhcp_information", {}) opts = resolve_vlan_options(vlan) gateway = opts["gateway"] @@ -619,7 +620,7 @@ def build_vlan_dnsmasq_conf(vlan, data): line("# Generated by core.py -- do not edit manually.") line("# Edit core.json and re-run: sudo python3 core.py --apply") - line(f"# VLAN: {name} (vlan_id={vlan['vlan_id']})") + line(f"# VLAN: {name} (vlan_id={derive_vlan_id(vlan.get('subnet', ''), vlan.get('subnet_mask', 24))})") line() line(f"pid-file={vlan_pid_file(vlan)}") if not is_wg(vlan): @@ -737,9 +738,8 @@ def build_vlan_dnsmasq_conf(vlan, data): # Build per-VLAN systemd service unit # =================================================================== -def build_vlan_service(vlan): +def build_vlan_service(vlan, iface): name = vlan["name"] - iface = vlan["interface"] conf = vlan_conf_file(vlan) if is_wg(vlan): @@ -938,9 +938,8 @@ def generate_wg_server_key(iface): kf.chmod(0o600) return private -def build_wg_server_conf(vlan, server_private_key): +def build_wg_server_conf(vlan, server_private_key, iface): """Build the /etc/wireguard/.conf content from core.json peers.""" - iface = vlan["interface"] info = vlan["vpn_information"] gateway = resolve_vlan_options(vlan)["gateway"] network = network_for(vlan) @@ -981,7 +980,7 @@ def ensure_wg_interfaces(data): return for vlan in wg_vlans: - iface = vlan["interface"] + iface = derive_interface(vlan, data) print(f" [{iface}]") kf = wg_server_key_path(iface) @@ -1002,7 +1001,7 @@ def ensure_wg_interfaces(data): WG_DIR.mkdir(exist_ok=True) conf_file = wg_conf_path_for(iface) - new_conf = build_wg_server_conf(vlan, private) + new_conf = build_wg_server_conf(vlan, private, iface) listen_port = vlan["vpn_information"]["listen_port"] port_changed = False @@ -1088,7 +1087,7 @@ def apply_dnsmasq_instances(data, dry_run=False, start_if_needed=True): start_if_needed=False (--apply): only restart instances already running; skip with a warning if not running. """ - active_service_stems = {vlan_service_name(vlan) for vlan in data["vlans"]} + active_service_stems = {vlan_service_name(vlan, derive_interface(vlan, data)) for vlan in data["vlans"]} if not dry_run: DNSMASQ_CONF_DIR.mkdir(exist_ok=True) @@ -1096,14 +1095,15 @@ def apply_dnsmasq_instances(data, dry_run=False, start_if_needed=True): print() for vlan in data["vlans"]: - if is_wg(vlan) and not dry_run and not wg_interface_up(vlan["interface"]): - print(f"Skipped VLAN '{vlan['name']}': {vlan['interface']} is not up. Run --apply again after WireGuard is up.") + iface = derive_interface(vlan, data) + if is_wg(vlan) and not dry_run and not wg_interface_up(iface): + print(f"Skipped VLAN '{vlan['name']}': {iface} is not up. Run --apply again after WireGuard is up.") continue - conf_content = build_vlan_dnsmasq_conf(vlan, data) - svc_content = build_vlan_service(vlan) + conf_content = build_vlan_dnsmasq_conf(vlan, data, iface) + svc_content = build_vlan_service(vlan, iface) conf_path = vlan_conf_file(vlan) - svc_path = vlan_service_file(vlan) + svc_path = vlan_service_file(vlan, iface) if dry_run: print(f"# -- {conf_path} (dry-run) --") @@ -1140,9 +1140,10 @@ def apply_dnsmasq_instances(data, dry_run=False, start_if_needed=True): if start_if_needed: print("Starting dnsmasq instances...") for vlan in data["vlans"]: - if is_wg(vlan) and not wg_interface_up(vlan["interface"]): + iface = derive_interface(vlan, data) + if is_wg(vlan) and not wg_interface_up(iface): continue - svc = vlan_service_name(vlan) + svc = vlan_service_name(vlan, iface) subprocess.run(["systemctl", "enable", svc], capture_output=True, text=True) result = subprocess.run(["systemctl", "restart", svc], capture_output=True, text=True) @@ -1153,9 +1154,10 @@ def apply_dnsmasq_instances(data, dry_run=False, start_if_needed=True): else: print("Reloading dnsmasq instances...") for vlan in data["vlans"]: - if is_wg(vlan) and not wg_interface_up(vlan["interface"]): + iface = derive_interface(vlan, data) + if is_wg(vlan) and not wg_interface_up(iface): continue - svc = vlan_service_name(vlan) + svc = vlan_service_name(vlan, iface) state = subprocess.run( ["systemctl", "is-active", svc], capture_output=True, text=True @@ -1475,18 +1477,18 @@ def build_nft_config(data, dry_run=False): # Exclude WG VLANs whose interface is not up -- nft rejects rules that # reference non-existent interfaces, which would leave no firewall at all. vlans = [v for v in data["vlans"] - if not is_wg(v) or dry_run or wg_interface_up(v["interface"])] + if not is_wg(v) or dry_run or wg_interface_up(derive_interface(v, data))] all_fwd = list(rule_enabled(data.get("port_forwarding", []))) all_wrngl = [(v, r) for v in vlans for r in rule_enabled(v.get("port_wrangling", []))] # Interfaces that are active (WG interfaces only included if up) - active_ifaces = {v["interface"] for v in vlans} + active_ifaces = {derive_interface(v, data) for v in vlans} # Build interface -> network map for nat_ip -> iface lookup in forward chain vlan_networks = {} for v in vlans: try: net = network_for(v) - vlan_networks[v["interface"]] = net + vlan_networks[derive_interface(v, data)] = net except (KeyError, ValueError): pass @@ -1525,7 +1527,7 @@ def build_nft_config(data, dry_run=False): line(" # -- Port wrangling (redirect VLAN traffic to local host) ----") line() for vlan, rule in all_wrngl: - iface = vlan["interface"] + iface = derive_interface(vlan, data) for proto, r, suffix in expand_protocols(rule): line(f" # {r['description']}{suffix}") line(f" iif \"{iface}\" {proto} dport {r['dest_port']} ip daddr != {r['redirect_to']} dnat to {r['redirect_to']}") @@ -1608,7 +1610,7 @@ def build_nft_config(data, dry_run=False): line(" # Allow all traffic inbound from any VLAN interface") for vlan in vlans: - line(f" iif \"{vlan['interface']}\" accept # {vlan['name']}") + line(f" iif \"{derive_interface(vlan, data)}\" accept # {vlan['name']}") line() if all_fwd: @@ -1639,14 +1641,14 @@ def build_nft_config(data, dry_run=False): line(" # Allow each VLAN -> WAN (outbound internet)") for vlan in vlans: - line(f" iif \"{vlan['interface']}\" oif \"{wan}\" accept # {vlan['name']} -> WAN") + line(f" iif \"{derive_interface(vlan, data)}\" oif \"{wan}\" accept # {vlan['name']} -> WAN") line() if container_bridges: line(" # Allow VLAN -> Docker bridge forwarding") for vlan in vlans: for bridge in container_bridges: - line(f" iif \"{vlan['interface']}\" oif \"{bridge}\" ct state new accept" + line(f" iif \"{derive_interface(vlan, data)}\" oif \"{bridge}\" ct state new accept" f" # {vlan['name']} -> {bridge}") line() @@ -1769,9 +1771,9 @@ def apply_nftables(data, dry_run=False): print(config) return - active_ifaces = {v["interface"] for v in data["vlans"] - if not is_wg(v) or wg_interface_up(v["interface"])} - active_vlans = [v for v in data["vlans"] if v["interface"] in active_ifaces] + active_ifaces = {derive_interface(v, data) for v in data["vlans"] + if not is_wg(v) or wg_interface_up(derive_interface(v, data))} + active_vlans = [v for v in data["vlans"] if derive_interface(v, data) in active_ifaces] all_fwd = list(rule_enabled(data.get("port_forwarding", []))) all_dis_fwd = list(rule_disabled(data.get("port_forwarding", []))) @@ -1794,7 +1796,7 @@ def apply_nftables(data, dry_run=False): # Build set of active subnets for filtering exception display active_subnets = [] for v in data["vlans"]: - if is_wg(v) and not wg_interface_up(v["interface"]): + if is_wg(v) and not wg_interface_up(derive_interface(v, data)): continue try: active_subnets.append(network_for(v)) @@ -1984,7 +1986,7 @@ def build_radius_users(data): ] for vlan in data["vlans"]: - vlan_id = vlan["vlan_id"] + vlan_id = derive_vlan_id(vlan.get('subnet', ''), vlan.get('subnet_mask', 24)) for r in vlan.get("reservations", []): if r.get("enabled") is not True: continue @@ -2000,7 +2002,7 @@ def build_radius_users(data): "", ] - default_id = default_vlan["vlan_id"] + default_id = derive_vlan_id(default_vlan.get('subnet', ''), default_vlan.get('subnet_mask', 24)) lines += [ f"# Default -- unknown MACs land on VLAN {default_id} ({default_vlan['name']})", "DEFAULT Auth-Type := Accept", @@ -2067,7 +2069,7 @@ def avahi_enabled(data): def avahi_interfaces(data): """Return list of interface names for VLANs with mdns_reflection enabled.""" - return [v["interface"] for v in data.get("vlans", []) if v.get("mdns_reflection") is True and not is_wg(v)] + return [derive_interface(v, data) for v in data.get("vlans", []) if v.get("mdns_reflection") is True and not is_wg(v)] def build_avahi_conf(data): """Patch avahi-daemon.conf directives needed for cross-VLAN mDNS reflection. @@ -2185,10 +2187,11 @@ def show_status(data): units = [] for vlan in data["vlans"]: - if is_wg(vlan) and not wg_interface_up(vlan["interface"]): - units.append((vlan_service_name(vlan), "(wg0 not up)", "active")) + iface = derive_interface(vlan, data) + if is_wg(vlan) and not wg_interface_up(iface): + units.append((vlan_service_name(vlan, iface), "(wg0 not up)", "active")) else: - units.append((vlan_service_name(vlan), None, "active")) + units.append((vlan_service_name(vlan, iface), None, "active")) units.append((f"{BLIST_TIMER_NAME}.timer", None, "active")) units.append((NAT_SERVICE_NAME, None, "inactive")) # oneshot - exits after running units.append(("freeradius", None, "active")) @@ -2251,7 +2254,7 @@ def reset_leases(data, vlan_name=None): # Stop for vlan in vlans: - svc = vlan_service_name(vlan) + svc = vlan_service_name(vlan, derive_interface(vlan, data)) result = subprocess.run(["systemctl", "stop", svc], capture_output=True, text=True) if result.returncode == 0: @@ -2272,7 +2275,7 @@ def reset_leases(data, vlan_name=None): # Restart print() for vlan in vlans: - svc = vlan_service_name(vlan) + svc = vlan_service_name(vlan, derive_interface(vlan, data)) result = subprocess.run(["systemctl", "start", svc], capture_output=True, text=True) if result.returncode == 0: @@ -2371,7 +2374,7 @@ def collect_metrics(data): any_running = False for vlan in data["vlans"]: - svc = vlan_service_name(vlan) + svc = vlan_service_name(vlan, derive_interface(vlan, data)) result = subprocess.run( ["systemctl", "kill", "--signal=SIGUSR1", svc], capture_output=True, text=True @@ -2388,7 +2391,7 @@ def collect_metrics(data): server_map = {} for vlan in data["vlans"]: - svc = vlan_service_name(vlan) + svc = vlan_service_name(vlan, derive_interface(vlan, data)) result = subprocess.run( ["journalctl", "-u", svc, "--since", "5 seconds ago", "--no-pager", "-o", "cat"], @@ -2539,7 +2542,7 @@ def stop_instances(data): remove_dashboard_timer() print() for vlan in data["vlans"]: - svc = vlan_service_name(vlan) + svc = vlan_service_name(vlan, derive_interface(vlan, data)) subprocess.run(["systemctl", "disable", "--now", svc], capture_output=True, text=True) print(f"Stopped and disabled: {svc}") @@ -2549,7 +2552,7 @@ def disable_all(data): stop_instances(data) print() for vlan in data["vlans"]: - for f in (vlan_conf_file(vlan), vlan_service_file(vlan)): + for f in (vlan_conf_file(vlan), vlan_service_file(vlan, derive_interface(vlan, data))): if f.exists(): f.unlink() print(f"Removed: {f}") @@ -2769,9 +2772,10 @@ def _dry_run_disable(data, iface, use_dhcp, static_cidr, resolv_ok, dns_choice, print(f"-- Stopping {PRODUCT_NAME} services (dry-run) --------------------------------") print(f" Would disable and stop: {BLIST_TIMER_NAME}.timer") for vlan in data["vlans"]: - svc = vlan_service_name(vlan) + iface = derive_interface(vlan, data) + svc = vlan_service_name(vlan, iface) conf = vlan_conf_file(vlan) - svc_f = vlan_service_file(vlan) + svc_f = vlan_service_file(vlan, iface) print(f" Would stop and disable: {svc}") if conf.exists(): print(f" Would remove: {conf}") @@ -2838,7 +2842,6 @@ def _dry_run_disable(data, iface, use_dhcp, static_cidr, resolv_ok, dns_choice, def cmd_disable(data, dry_run=False): """Interactive wizard to revert the machine from router to plain network client.""" import readline - data = resolve_vlan_derived_fields(data) print() print("=" * 70) @@ -2875,7 +2878,7 @@ def cmd_disable(data, dry_run=False): if physical is None: die("No physical VLAN (vlan_id=1) found in config. Cannot determine interface.") - iface = physical["interface"] + iface = derive_interface(physical, data) print(" How should this machine obtain its IP address after reversion?") print() @@ -3039,7 +3042,6 @@ def cmd_apply(data, dry_run=False): dnsmasq confs, start/restart all services whose interface is up, nftables, timer, and boot service. Safe to run repeatedly. """ - data = resolve_vlan_derived_fields(data) if dry_run: print("[DRY RUN] --apply would perform the following actions:") print() @@ -3071,7 +3073,7 @@ def cmd_apply(data, dry_run=False): print(f" Would write: {RADIUS_USERS_FILE}") print(f" {total_macs} MAC reservation(s)") if default_vlan: - print(f" DEFAULT -> VLAN {default_vlan['vlan_id']} ({default_vlan['name']})") + print(f" DEFAULT -> VLAN {derive_vlan_id(default_vlan.get('subnet', ''), default_vlan.get('subnet_mask', 24))} ({default_vlan['name']})") print(f" Would ensure freeradius is running") if avahi_enabled(data): print() diff --git a/routlin/validation.py b/routlin/validation.py index c30a700..50a5180 100644 --- a/routlin/validation.py +++ b/routlin/validation.py @@ -267,32 +267,22 @@ def derive_vlan_id(subnet, prefix): return None -def resolve_vlan_derived_fields(data): - """Return a deep copy of data with vlan_id and interface computed for every VLAN. - - WireGuard VLANs are assigned wg0/wg1/... in ascending vlan_id order for - deterministic interface naming regardless of JSON list order. - Does not mutate the input dict. - """ - import copy - result = copy.deepcopy(data) - lan = result.get("general", {}).get("lan_interface", "eth0") - vlans = result.get("vlans", []) - - for vlan in vlans: - vlan["vlan_id"] = derive_vlan_id(vlan.get("subnet", ""), vlan.get("subnet_mask", 24)) - - wg_entries = [(i, v) for i, v in enumerate(vlans) if is_wg(v)] - wg_sorted = sorted(wg_entries, key=lambda x: (x[1].get("vlan_id") is None, x[1].get("vlan_id") or 0)) - for wg_idx, (_, vlan) in enumerate(wg_sorted): - vlan["interface"] = f"wg{wg_idx}" - - for vlan in vlans: - if not is_wg(vlan): - vid = vlan.get("vlan_id", 1) - vlan["interface"] = lan if vid == 1 else f"{lan}.{vid}" - - return result +def derive_interface(vlan, data): + """Derive the interface name for a VLAN without mutating data.""" + lan = data.get('general', {}).get('lan_interface', 'eth0') + if is_wg(vlan): + wg_vlans = [v for v in data.get('vlans', []) if is_wg(v)] + wg_sorted = sorted( + wg_vlans, + key=lambda v: ( + derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) is None, + derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) or 0, + ) + ) + idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0) + return f'wg{idx}' + vid = derive_vlan_id(vlan.get('subnet', ''), vlan.get('subnet_mask', 24)) + return lan if vid == 1 else f'{lan}.{vid}' # ===================================================================