Development
This commit is contained in:
parent
270856b391
commit
2bfa5ff29a
18 changed files with 814 additions and 565 deletions
505
router/core.py
505
router/core.py
|
|
@ -100,7 +100,12 @@ import urllib.error
|
|||
import argparse
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from validation import VALID_PROTOCOLS, VALID_BLOCKLIST_FORMATS
|
||||
from validation import (
|
||||
VALID_PROTOCOLS, VALID_BLOCKLIST_FORMATS,
|
||||
int_range, domainname,
|
||||
inject_interfaces, is_wg, is_dynamic_ip,
|
||||
validate_config,
|
||||
)
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent
|
||||
CONFIG_FILE = SCRIPT_DIR / "core.json"
|
||||
|
|
@ -247,29 +252,6 @@ def resolve_vlan_options(vlan):
|
|||
def is_physical(vlan):
|
||||
return vlan["vlan_id"] == 1
|
||||
|
||||
def is_wg(vlan):
|
||||
return vlan.get("is_vpn", False)
|
||||
|
||||
|
||||
def inject_interfaces(data):
|
||||
"""Compute and inject the 'interface' field for every VLAN from is_vpn + vlan_id.
|
||||
|
||||
is_vpn=False (regular VLAN):
|
||||
vlan_id 1 → general.lan_interface (e.g. enp6s0)
|
||||
vlan_id N → lan_interface.N (e.g. enp6s0.10)
|
||||
is_vpn=True (WireGuard VLAN):
|
||||
1st WG VLAN → wg0, 2nd → wg1, etc. (order in vlans array)
|
||||
"""
|
||||
lan = data.get("general", {}).get("lan_interface", "eth0")
|
||||
wg_idx = 0
|
||||
for vlan in data.get("vlans", []):
|
||||
if vlan.get("is_vpn"):
|
||||
vlan["interface"] = f"wg{wg_idx}"
|
||||
wg_idx += 1
|
||||
else:
|
||||
vid = vlan.get("vlan_id", 1)
|
||||
vlan["interface"] = lan if vid == 1 else f"{lan}.{vid}"
|
||||
|
||||
def networkd_stem(vlan):
|
||||
return f"10-router-{vlan['name']}"
|
||||
|
||||
|
|
@ -297,13 +279,6 @@ def rule_enabled(rules):
|
|||
def rule_disabled(rules):
|
||||
return [r for r in rules if r.get("enabled") is not True]
|
||||
|
||||
def is_dynamic_ip(r):
|
||||
"""Return True if a reservation has no pinned IP -- DHCP assigns from pool.
|
||||
Triggered by: ip field absent, empty string, or the keyword 'dynamic'.
|
||||
"""
|
||||
ip = r.get("ip", "dynamic")
|
||||
return ip in ("", "dynamic") or ip is None
|
||||
|
||||
def expand_protocols(rule):
|
||||
"""Return list of (protocol, rule, comment_suffix) tuples.
|
||||
When protocol is 'both', expands into tcp and udp with suffixes
|
||||
|
|
@ -328,467 +303,6 @@ def load_config():
|
|||
die("No vlans defined in core.json.")
|
||||
return data
|
||||
|
||||
# ===================================================================
|
||||
# Validate
|
||||
# ===================================================================
|
||||
|
||||
def validate_config(data):
|
||||
inject_interfaces(data)
|
||||
errors = []
|
||||
seen_vlan_ids = {}
|
||||
seen_interfaces = {}
|
||||
seen_names = {}
|
||||
seen_listen_ports = {}
|
||||
|
||||
# -- upstream_dns block ----------------------------------------------------
|
||||
if not data.get("upstream_dns", {}).get("upstream_servers"):
|
||||
errors.append("upstream_dns.upstream_servers is missing or empty.")
|
||||
|
||||
# -- WAN / LAN interfaces --------------------------------------------------
|
||||
gen = data.get("general", {})
|
||||
wan = gen.get("wan_interface", "")
|
||||
lan = gen.get("lan_interface", "")
|
||||
if not wan:
|
||||
errors.append("general.wan_interface is missing or empty.")
|
||||
if not lan:
|
||||
errors.append("general.lan_interface is missing or empty.")
|
||||
if wan and lan:
|
||||
available_interfaces = set()
|
||||
try:
|
||||
result = subprocess.run(["ip", "link", "show"], capture_output=True, text=True)
|
||||
available_interfaces = set(re.findall(r"^\d+:\s+(\S+):", result.stdout, re.MULTILINE))
|
||||
available_interfaces = {i.split("@")[0] for i in available_interfaces}
|
||||
except Exception:
|
||||
pass
|
||||
if available_interfaces:
|
||||
if wan not in available_interfaces:
|
||||
errors.append(f"general.wan_interface: '{wan}' does not exist on this system.")
|
||||
if lan not in available_interfaces:
|
||||
errors.append(f"general.lan_interface: '{lan}' does not exist on this system.")
|
||||
if wan == lan:
|
||||
errors.append(f"general.wan_interface and general.lan_interface must be different (both set to '{wan}').")
|
||||
|
||||
# -- Blocklist library -----------------------------------------------------
|
||||
blocklists_by_name = {}
|
||||
for idx, bl in enumerate(data.get("blocklists", [])):
|
||||
name = bl.get("name", "")
|
||||
label = f"blocklists[{idx}] '{name}'"
|
||||
for field in ("name", "description", "save_as", "url", "format"):
|
||||
if not bl.get(field):
|
||||
errors.append(f"{label}: missing or empty field '{field}'.")
|
||||
if bl.get("format") and bl["format"] not in VALID_BLOCKLIST_FORMATS:
|
||||
errors.append(f"{label}: format must be one of: {', '.join(sorted(VALID_BLOCKLIST_FORMATS))}.")
|
||||
if name:
|
||||
if name in blocklists_by_name:
|
||||
errors.append(f"{label}: duplicate blocklist name '{name}'.")
|
||||
else:
|
||||
blocklists_by_name[name] = bl
|
||||
|
||||
# -- Per-VLAN validation ---------------------------------------------------
|
||||
vlan_networks = {} # interface -> IPv4Network (built for nat validation)
|
||||
|
||||
for vlan in data["vlans"]:
|
||||
vlan_id = vlan.get("vlan_id")
|
||||
name = vlan.get("name", "?")
|
||||
iface = vlan.get("interface", "")
|
||||
label = f"vlan '{name}' (id={vlan_id})"
|
||||
|
||||
if name in seen_names:
|
||||
errors.append(f"{label}: duplicate vlan name '{name}' "
|
||||
f"(also used by id={seen_names[name]}).")
|
||||
else:
|
||||
seen_names[name] = vlan_id
|
||||
|
||||
if vlan_id in seen_vlan_ids:
|
||||
errors.append(f"{label}: duplicate vlan_id {vlan_id} "
|
||||
f"(also used by '{seen_vlan_ids[vlan_id]}').")
|
||||
else:
|
||||
seen_vlan_ids[vlan_id] = name
|
||||
|
||||
if iface in seen_interfaces:
|
||||
errors.append(f"{label}: duplicate interface '{iface}' "
|
||||
f"(also used by '{seen_interfaces[iface]}').")
|
||||
else:
|
||||
seen_interfaces[iface] = name
|
||||
|
||||
if vlan.get("mdns_reflection") is True and is_wg(vlan):
|
||||
errors.append(f"{label}: mdns_reflection must be false for WireGuard interfaces.")
|
||||
|
||||
if is_wg(vlan):
|
||||
# -- vpn_information -----------------------------------------------
|
||||
vpi = vlan.get("vpn_information")
|
||||
if not isinstance(vpi, dict):
|
||||
errors.append(f"{label}: vpn_information must be a plain object.")
|
||||
vpi = {}
|
||||
else:
|
||||
lp = vpi.get("listen_port")
|
||||
if not isinstance(lp, int) or not (1 <= lp <= 65535):
|
||||
errors.append(f"{label}: vpn_information.listen_port must be an integer 1-65535.")
|
||||
elif lp in seen_listen_ports:
|
||||
errors.append(f"{label}: vpn_information.listen_port {lp} is already used by "
|
||||
f"'{seen_listen_ports[lp]}'.")
|
||||
else:
|
||||
seen_listen_ports[lp] = name
|
||||
|
||||
# -- subnet/subnet_mask --------------------------------------------
|
||||
for field in ("subnet", "subnet_mask"):
|
||||
if not vlan.get(field):
|
||||
errors.append(f"{label}: missing required field '{field}'.")
|
||||
wg_net = None
|
||||
if vlan.get("subnet") and vlan.get("subnet_mask"):
|
||||
try:
|
||||
wg_net = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
|
||||
vlan_networks[iface] = wg_net
|
||||
except ValueError as e:
|
||||
errors.append(f"{label}: invalid subnet/subnet_mask: {e}")
|
||||
|
||||
# -- server_identities ---------------------------------------------
|
||||
if not vlan.get("server_identities"):
|
||||
errors.append(f"{label}: server_identities is empty or missing.")
|
||||
identity_ips = []
|
||||
for idx, ident in enumerate(vlan.get("server_identities", [])):
|
||||
ip_str = ident.get("ip", "")
|
||||
ilabel = f"{label} server_identities[{idx}] '{ident.get('description', '?')}'"
|
||||
if not ip_str:
|
||||
errors.append(f"{ilabel}: missing 'ip' field.")
|
||||
continue
|
||||
try:
|
||||
ip = ipaddress.IPv4Address(ip_str)
|
||||
if wg_net and ip not in wg_net:
|
||||
errors.append(f"{ilabel}: ip '{ip_str}' is not within subnet {wg_net}.")
|
||||
else:
|
||||
identity_ips.append(ip)
|
||||
except ValueError:
|
||||
errors.append(f"{ilabel}: ip '{ip_str}' is not a valid IPv4 address.")
|
||||
|
||||
# -- vpn_information.explicit_overrides ----------------------------
|
||||
eo = vpi.get("explicit_overrides", {}) if isinstance(vpi, dict) else {}
|
||||
if not isinstance(eo, dict):
|
||||
errors.append(f"{label}: vpn_information.explicit_overrides must be a plain object.")
|
||||
else:
|
||||
gw = eo.get("gateway", "")
|
||||
if gw:
|
||||
try:
|
||||
gw_ip = ipaddress.IPv4Address(gw)
|
||||
if identity_ips and gw_ip not in identity_ips:
|
||||
errors.append(
|
||||
f"{label}: vpn_information.explicit_overrides.gateway '{gw}' does not match "
|
||||
f"any server_identity IP. Must be one of: "
|
||||
f"{[str(ip) for ip in identity_ips]}."
|
||||
)
|
||||
except ValueError:
|
||||
errors.append(f"{label}: vpn_information.explicit_overrides.gateway '{gw}' is not a valid IPv4 address.")
|
||||
dns = eo.get("dns_server", "")
|
||||
if dns:
|
||||
try:
|
||||
ipaddress.IPv4Address(dns)
|
||||
except ValueError:
|
||||
errors.append(f"{label}: vpn_information.explicit_overrides.dns_server '{dns}' is not a valid IPv4 address.")
|
||||
mtu = eo.get("mtu", "")
|
||||
if mtu:
|
||||
try:
|
||||
m = int(mtu)
|
||||
if not (576 <= m <= 9000):
|
||||
errors.append(f"{label}: vpn_information.explicit_overrides.mtu {mtu} is out of valid range (576-9000).")
|
||||
except (ValueError, TypeError):
|
||||
errors.append(f"{label}: vpn_information.explicit_overrides.mtu '{mtu}' is not a valid integer.")
|
||||
|
||||
# -- peers ---------------------------------------------------------
|
||||
seen_peer_names = {}
|
||||
seen_peer_ips = {}
|
||||
for pidx, peer in enumerate(vlan.get("peers", [])):
|
||||
pname = peer.get("name", "")
|
||||
plabel = f"{label} peer[{pidx}] '{pname}'"
|
||||
if not pname:
|
||||
errors.append(f"{plabel}: missing 'name' field.")
|
||||
elif pname in seen_peer_names:
|
||||
errors.append(f"{plabel}: duplicate peer name '{pname}'.")
|
||||
else:
|
||||
seen_peer_names[pname] = pidx
|
||||
if not peer.get("public_key"):
|
||||
errors.append(f"{plabel}: missing 'public_key' field.")
|
||||
pip_str = peer.get("ip", "")
|
||||
if not pip_str:
|
||||
errors.append(f"{plabel}: missing 'ip' field.")
|
||||
else:
|
||||
try:
|
||||
pip = ipaddress.IPv4Address(pip_str)
|
||||
if wg_net and pip not in wg_net:
|
||||
errors.append(f"{plabel}: ip '{pip_str}' is not within subnet {wg_net}.")
|
||||
if pip in identity_ips:
|
||||
errors.append(f"{plabel}: ip '{pip_str}' conflicts with a server_identity.")
|
||||
if pip_str in seen_peer_ips:
|
||||
errors.append(
|
||||
f"{plabel}: duplicate peer ip '{pip_str}' "
|
||||
f"(also used by peer '{seen_peer_ips[pip_str]}')."
|
||||
)
|
||||
else:
|
||||
seen_peer_ips[pip_str] = pname
|
||||
except ValueError:
|
||||
errors.append(f"{plabel}: ip '{pip_str}' is not a valid IPv4 address.")
|
||||
continue
|
||||
|
||||
if not vlan.get("server_identities"):
|
||||
errors.append(f"{label}: server_identities is empty or missing.")
|
||||
continue
|
||||
|
||||
for field in ("subnet", "subnet_mask"):
|
||||
if not vlan.get(field):
|
||||
errors.append(f"{label}: missing required top-level field '{field}'.")
|
||||
if not vlan.get("subnet") or not vlan.get("subnet_mask"):
|
||||
continue
|
||||
|
||||
try:
|
||||
network = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
|
||||
vlan_networks[iface] = network
|
||||
except ValueError as e:
|
||||
errors.append(f"{label}: invalid subnet/subnet_mask: {e}")
|
||||
continue
|
||||
|
||||
d = vlan.get("dhcp_information", {})
|
||||
required_dhcp = {"dynamic_pool_start", "dynamic_pool_end", "lease_time"}
|
||||
missing = required_dhcp - set(d.keys())
|
||||
if missing:
|
||||
errors.append(f"{label}: missing dhcp_information fields: {missing}")
|
||||
continue
|
||||
|
||||
def check_ip(field_label, ip_str, allow_none=False):
|
||||
if ip_str is None:
|
||||
if not allow_none:
|
||||
errors.append(f"{label}: {field_label} is null/missing.")
|
||||
return None
|
||||
try:
|
||||
ip = ipaddress.IPv4Address(ip_str)
|
||||
except ValueError:
|
||||
errors.append(f"{label}: {field_label} '{ip_str}' is not a valid IPv4 address.")
|
||||
return None
|
||||
if ip not in network:
|
||||
errors.append(f"{label}: {field_label} '{ip_str}' is not within subnet {network}.")
|
||||
return ip
|
||||
|
||||
identity_ips = []
|
||||
for idx, ident in enumerate(vlan["server_identities"]):
|
||||
ip = check_ip(
|
||||
f"server_identities[{idx}] '{ident.get('description', '?')}'",
|
||||
ident.get("ip")
|
||||
)
|
||||
if ip:
|
||||
identity_ips.append(ip)
|
||||
|
||||
# -- Validate explicit_overrides -----------------------------------
|
||||
eo = d.get("explicit_overrides", {})
|
||||
if not isinstance(eo, dict):
|
||||
errors.append(f"{label}: explicit_overrides must be a plain object.")
|
||||
else:
|
||||
gw = eo.get("gateway", "")
|
||||
if gw:
|
||||
gw_ip = check_ip("explicit_overrides.gateway", gw)
|
||||
if gw_ip and gw_ip not in identity_ips:
|
||||
errors.append(
|
||||
f"{label}: explicit_overrides.gateway '{gw}' does not match "
|
||||
f"any server_identity IP. Must be one of: "
|
||||
f"{[str(ip) for ip in identity_ips]}."
|
||||
)
|
||||
dns = eo.get("dns_server", "")
|
||||
if dns:
|
||||
check_ip("explicit_overrides.dns_server", dns)
|
||||
ntp = eo.get("ntp_server", "")
|
||||
if ntp:
|
||||
check_ip("explicit_overrides.ntp_server", ntp)
|
||||
|
||||
pool_start = check_ip("dynamic_pool_start", d["dynamic_pool_start"])
|
||||
pool_end = check_ip("dynamic_pool_end", d["dynamic_pool_end"])
|
||||
|
||||
if pool_start and pool_end and pool_start > pool_end:
|
||||
errors.append(
|
||||
f"{label}: dynamic_pool_start '{pool_start}' is greater than "
|
||||
f"dynamic_pool_end '{pool_end}'."
|
||||
)
|
||||
|
||||
if pool_start and pool_end:
|
||||
for ip in identity_ips:
|
||||
if pool_start <= ip <= pool_end:
|
||||
errors.append(
|
||||
f"{label}: server_identity '{ip}' falls inside the dynamic "
|
||||
f"pool ({pool_start} - {pool_end})."
|
||||
)
|
||||
|
||||
seen_res_ips = {}
|
||||
seen_res_macs = {}
|
||||
for r in vlan.get("reservations", []):
|
||||
rdesc = r.get("description", "?")
|
||||
rmac = r.get("mac", "").lower().strip()
|
||||
|
||||
if is_dynamic_ip(r):
|
||||
rip = None # no pinned IP -- skip all IP validation
|
||||
else:
|
||||
rip = check_ip(f"reservation '{rdesc}' ip", r.get("ip"))
|
||||
|
||||
if rip:
|
||||
if pool_start and pool_end and pool_start <= rip <= pool_end:
|
||||
errors.append(
|
||||
f"{label}: reservation '{rdesc}' ip '{rip}' falls inside "
|
||||
f"the dynamic pool ({pool_start} - {pool_end})."
|
||||
)
|
||||
rip_str = str(rip)
|
||||
if rip_str in seen_res_ips:
|
||||
# Allow same IP for different MACs (multi-interface device)
|
||||
# Only flag if same MAC is also duplicated (caught below)
|
||||
if rmac and rmac in seen_res_ips[rip_str]:
|
||||
errors.append(
|
||||
f"{label}: reservation '{rdesc}' ip '{rip}' and MAC '{rmac}' "
|
||||
f"duplicates '{seen_res_ips[rip_str][rmac]}'."
|
||||
)
|
||||
else:
|
||||
seen_res_ips[rip_str][rmac] = rdesc
|
||||
else:
|
||||
seen_res_ips[rip_str] = {rmac: rdesc}
|
||||
if rip in identity_ips:
|
||||
errors.append(
|
||||
f"{label}: reservation '{rdesc}' ip '{rip}' conflicts "
|
||||
f"with a server_identity."
|
||||
)
|
||||
|
||||
if rmac:
|
||||
if rmac in seen_res_macs:
|
||||
errors.append(
|
||||
f"{label}: reservation '{rdesc}' MAC '{rmac}' duplicates "
|
||||
f"'{seen_res_macs[rmac]}'."
|
||||
)
|
||||
else:
|
||||
seen_res_macs[rmac] = rdesc
|
||||
|
||||
for bl_name in vlan.get("use_blocklists", []):
|
||||
if bl_name not in blocklists_by_name:
|
||||
errors.append(f"{label}: use_blocklists references unknown blocklist '{bl_name}'.")
|
||||
|
||||
# -- NAT / firewall validation ---------------------------------------------
|
||||
valid_protos = VALID_PROTOCOLS
|
||||
known_interfaces = set(seen_interfaces.keys())
|
||||
|
||||
def nat_check_port(label, port):
|
||||
try:
|
||||
p = int(port)
|
||||
if not (1 <= p <= 65535):
|
||||
errors.append(f"{label}: port {port} is out of valid range (1-65535).")
|
||||
except (TypeError, ValueError):
|
||||
errors.append(f"{label}: '{port}' is not a valid port number.")
|
||||
|
||||
def nat_check_ip(label, ip_str):
|
||||
try:
|
||||
return ipaddress.IPv4Address(ip_str)
|
||||
except ValueError:
|
||||
errors.append(f"{label}: '{ip_str}' is not a valid IPv4 address.")
|
||||
return None
|
||||
|
||||
def nat_check_ip_in_network(label, ip_str, network):
|
||||
ip = nat_check_ip(label, ip_str)
|
||||
if ip and ip not in network:
|
||||
errors.append(f"{label}: '{ip_str}' is not within subnet {network}.")
|
||||
|
||||
for vlan in data["vlans"]:
|
||||
name = vlan.get("name", "?")
|
||||
iface = vlan.get("interface", "")
|
||||
net = vlan_networks.get(iface)
|
||||
|
||||
for r in vlan.get("port_wrangling", []):
|
||||
desc = r.get("description", "?")
|
||||
label = f"vlan '{name}' port_wrangling '{desc}'"
|
||||
if r.get("protocol") not in valid_protos:
|
||||
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
|
||||
f"Must be tcp, udp, or both.")
|
||||
nat_check_port(f"{label} dest_port", r.get("dest_port"))
|
||||
if net:
|
||||
nat_check_ip_in_network(f"{label} redirect_to", r.get("redirect_to", ""), net)
|
||||
|
||||
# -- port_forwarding validation (top-level) --------------------------------
|
||||
for idx, r in enumerate(data.get("port_forwarding", [])):
|
||||
desc = r.get("description", "?")
|
||||
label = f"port_forwarding[{idx}] '{desc}'"
|
||||
if r.get("protocol") not in valid_protos:
|
||||
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
|
||||
f"Must be tcp, udp, or both.")
|
||||
nat_check_port(f"{label} dest_port", r.get("dest_port"))
|
||||
nat_check_port(f"{label} nat_port", r.get("nat_port"))
|
||||
nat_check_ip(f"{label} nat_ip", r.get("nat_ip", ""))
|
||||
|
||||
for r in data.get("inter_vlan_exceptions", []):
|
||||
desc = r.get("description", "?")
|
||||
label = f"inter_vlan_exceptions '{desc}'"
|
||||
if r.get("protocol") not in valid_protos:
|
||||
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
|
||||
f"Must be tcp, udp, or both.")
|
||||
if "src_ip_or_subnet" not in r:
|
||||
errors.append(f"{label}: missing field 'src_ip_or_subnet'.")
|
||||
else:
|
||||
val = r["src_ip_or_subnet"]
|
||||
try:
|
||||
ipaddress.IPv4Address(val)
|
||||
except ValueError:
|
||||
try:
|
||||
ipaddress.IPv4Network(val, strict=False)
|
||||
except ValueError:
|
||||
errors.append(f"{label}: src_ip_or_subnet '{val}' is not a valid "
|
||||
f"IPv4 address or network.")
|
||||
# Support both dst_ip (legacy, single IP) and dst_ip_or_subnet (IP or subnet)
|
||||
dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "")
|
||||
if not dst:
|
||||
errors.append(f"{label}: missing field 'dst_ip_or_subnet'.")
|
||||
else:
|
||||
try:
|
||||
ipaddress.IPv4Address(dst)
|
||||
except ValueError:
|
||||
try:
|
||||
ipaddress.IPv4Network(dst, strict=False)
|
||||
except ValueError:
|
||||
errors.append(f"{label}: dst_ip_or_subnet '{dst}' is not a valid "
|
||||
f"IPv4 address or network.")
|
||||
if r.get("dst_port") is not None:
|
||||
nat_check_port(f"{label} dst_port", r.get("dst_port"))
|
||||
|
||||
# -- radius_default uniqueness check ---------------------------------------
|
||||
defaults = [v["name"] for v in data["vlans"] if v.get("radius_default") is True]
|
||||
if len(defaults) > 1:
|
||||
errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). "
|
||||
f"Only one VLAN may be the RADIUS default.")
|
||||
|
||||
# -- host_overrides validation ---------------------------------------------
|
||||
all_vlan_nets = list(vlan_networks.values())
|
||||
for idx, entry in enumerate(data.get("host_overrides", [])):
|
||||
lbl = f"host_overrides[{idx}] '{entry.get('host', '?')}'"
|
||||
if not entry.get("host"):
|
||||
errors.append(f"{lbl}: missing 'host' field.")
|
||||
ip_str = entry.get("ip", "")
|
||||
if not ip_str:
|
||||
errors.append(f"{lbl}: missing 'ip' field.")
|
||||
else:
|
||||
try:
|
||||
ip_addr = ipaddress.IPv4Address(ip_str)
|
||||
if all_vlan_nets and not any(ip_addr in net for net in all_vlan_nets):
|
||||
errors.append(
|
||||
f"{lbl}: '{ip_str}' does not fall within any configured VLAN subnet."
|
||||
)
|
||||
except ValueError:
|
||||
errors.append(f"{lbl}: '{ip_str}' is not a valid IPv4 address.")
|
||||
|
||||
# -- banned_ips validation -------------------------------------------------
|
||||
for idx, entry in enumerate(data.get("banned_ips", [])):
|
||||
ip = entry.get("ip", "")
|
||||
lbl = f"banned_ips[{idx}] '{entry.get('description', '')}'"
|
||||
if not ip:
|
||||
errors.append(f"{lbl}: missing 'ip' field.")
|
||||
continue
|
||||
try:
|
||||
expand_banned_ip(ip)
|
||||
except ValueError as e:
|
||||
errors.append(f"{lbl}: {e}")
|
||||
|
||||
if errors:
|
||||
print("Validation failed:", file=sys.stderr)
|
||||
for e in errors:
|
||||
print(f" - {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# ===================================================================
|
||||
# Build systemd-networkd files
|
||||
# ===================================================================
|
||||
|
|
@ -3759,7 +3273,12 @@ def main():
|
|||
sys.exit(1)
|
||||
|
||||
data = load_config()
|
||||
validate_config(data)
|
||||
errors = validate_config(data)
|
||||
if errors:
|
||||
print("Validation failed:", file=sys.stderr)
|
||||
for e in errors:
|
||||
print(f" - {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
general = data.get("general", {})
|
||||
setup_logging(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue