linuxrouter/routlin/validation.py
2026-05-27 15:29:23 -04:00

751 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
validation.py -- Shared structural validators for config.json fields.
Lives alongside core.py in ~/routlin/ and is volume-mounted into the
routlin-dash container at /app/validation.py. Importable by both
core.py (router host) and the Flask app directly.
Convention: primitive validators accept a raw string and return the
normalised valid value, or '' / None if the input is invalid.
"""
import ipaddress
import os
import re
VALID_PROTOCOLS = {'tcp', 'udp', 'both'}
VALID_BLOCKLIST_FORMATS = {'dnsmasq', 'hosts'}
VALID_DDNS_PROVIDERS = ('noip', 'cloudflare', 'duckdns')
# ===================================================================
# IP / CIDR
# ===================================================================
def ip(value):
"""Return value if it is a valid IPv4 or IPv6 address, else ''."""
if not value:
return ''
v = str(value).strip()
try:
ipaddress.ip_address(v)
return v
except ValueError:
return ''
def ip_or_cidr(value):
"""Return value if it is a valid IPv4/IPv6 address or CIDR network, else ''."""
if not value:
return ''
v = str(value).strip()
try:
ipaddress.ip_address(v)
return v
except ValueError:
pass
try:
ipaddress.ip_network(v, strict=False)
return v
except ValueError:
return ''
def ipv4(value):
"""Return value if it is a valid IPv4 address, else ''."""
if not value:
return ''
v = str(value).strip()
try:
ipaddress.IPv4Address(v)
return v
except ValueError:
return ''
def ipv4_or_cidr(value):
"""Return value if it is a valid IPv4 address or IPv4 CIDR network, else ''."""
if not value:
return ''
v = str(value).strip()
try:
ipaddress.IPv4Address(v)
return v
except ValueError:
pass
try:
ipaddress.IPv4Network(v, strict=False)
return v
except ValueError:
return ''
# ===================================================================
# Port
# ===================================================================
def port(value):
"""Return port as string if valid 1-65535, else ''."""
try:
p = int(re.sub(r'[^0-9]', '', str(value)))
if 1 <= p <= 65535:
return str(p)
except (ValueError, TypeError):
pass
return ''
# ===================================================================
# Time
# ===================================================================
def time_24h(value):
"""Return value if it is a valid 24-hour HH:MM time string, else ''."""
if not value:
return ''
v = str(value).strip()
if re.fullmatch(r'([01]\d|2[0-3]):[0-5]\d', v):
return v
return ''
# ===================================================================
# Integer range
# ===================================================================
def int_range(value, lo, hi):
"""Return value as int if it is an integer within [lo, hi], else None.
lo and hi may be None to indicate no lower or upper bound respectively.
"""
try:
v = int(str(value).strip())
if (lo is None or v >= lo) and (hi is None or v <= hi):
return v
except (ValueError, TypeError):
pass
return None
# ===================================================================
# Domain name
# ===================================================================
def domainname(value):
"""Return value if it is a valid domain name, else ''.
Rules: labels separated by dots; each label contains only
letters, digits, and hyphens; no label may start or end with a
hyphen; no consecutive dots; total length <= 253.
"""
if not value:
return ''
v = str(value).strip().lower()
if len(v) > 253:
return ''
if '..' in v or v.startswith('.') or v.endswith('.'):
return ''
labels = v.split('.')
for label in labels:
if not label:
return ''
if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?$|^[a-zA-Z0-9]$', label):
return ''
return v
# ===================================================================
# Banned-IP pattern
# ===================================================================
def banned_ip(value):
"""
Return value if it is a valid banned_ip pattern, else ''.
Accepted formats (mirrors core.py expand_banned_ip):
IPv4:
Single address 192.0.2.1
CIDR 192.0.2.0/24
Wildcard octet 192.0.2.*
Octet range 192.0.2.10-20
(combinations that expand to <=1024 entries are accepted)
IPv6:
Single address 2001:db8::1
CIDR 2001:db8::/32
Trailing wildcard 2001:db8:c17:*
"""
if not value:
return ''
v = str(value).strip()
try:
_check_banned_ip(v)
return v
except (ValueError, TypeError):
return ''
def _check_banned_ip(ip_str):
if ':' in ip_str:
_check_banned_ipv6(ip_str)
else:
_check_banned_ipv4(ip_str)
def _check_banned_ipv4(ip_str):
if '/' in ip_str:
ipaddress.IPv4Network(ip_str, strict=False)
return
parts = ip_str.split('.')
if len(parts) != 4:
raise ValueError(f"Expected 4 octets: {ip_str!r}")
def parse_octet(s):
if s == '*':
return (0, 255)
if '-' in s:
a, b = s.split('-', 1)
lo, hi = int(a), int(b)
if not (0 <= lo <= hi <= 255):
raise ValueError(f"Invalid octet range {s!r}")
return (lo, hi)
v = int(s)
if not 0 <= v <= 255:
raise ValueError(f"Octet {v} out of 0-255")
return (v, v)
ranges = [parse_octet(p) for p in parts]
trailing = 0
for lo, hi in reversed(ranges):
if lo == 0 and hi == 255:
trailing += 1
else:
break
total = 1
for lo, hi in ranges[:4 - trailing]:
total *= (hi - lo + 1)
if total > 1024:
raise ValueError(f"Pattern expands to {total} entries (limit 1024); use CIDR")
def _check_banned_ipv6(ip_str):
if '/' in ip_str:
ipaddress.IPv6Network(ip_str, strict=False)
return
if '*' not in ip_str:
ipaddress.IPv6Address(ip_str)
return
if not ip_str.endswith(':*'):
raise ValueError(f"Unsupported IPv6 wildcard: {ip_str!r}; use 'prefix:*' or CIDR")
prefix_part = ip_str[:-2]
if '::' in prefix_part:
left, right = prefix_part.split('::', 1)
lg = [g for g in left.split(':') if g] if left else []
rg = [g for g in right.split(':') if g] if right else []
zeros = 8 - len(lg) - len(rg) - 1
if zeros < 0:
raise ValueError(f"Too many groups in {ip_str!r}")
groups = lg + ['0000'] * zeros + rg
else:
groups = [g for g in prefix_part.split(':') if g]
if not (1 <= len(groups) <= 7):
raise ValueError(f"IPv6 wildcard must have 1-7 prefix groups: {ip_str!r}")
# ===================================================================
# VLAN / interface helpers (shared with core.py apply logic)
# ===================================================================
def is_wg(vlan):
return vlan.get("is_vpn", False)
def is_dynamic_ip(r):
"""Return True if a reservation has no pinned IP (DHCP assigns from pool)."""
ip = r.get("ip", "dynamic")
return ip in ("", "dynamic") or ip is None
def derive_vlan_id(subnet, prefix):
"""Return VLAN ID (1-4094) derived from the active octet of the network address, or None."""
try:
network = ipaddress.IPv4Network(f'{subnet}/{prefix}', strict=False)
octets = list(network.network_address.packed)
byte_idx = (int(prefix) - 1) // 8
vlan_id = octets[byte_idx]
if 1 <= vlan_id <= 4094:
return vlan_id
return None
except Exception:
return None
def derive_interface(vlan, data):
"""Derive the interface name for a VLAN without mutating data."""
lan = data.get('network_interfaces', {}).get('lan_interface', 'eth0')
if is_wg(vlan):
wg_vlans = [v for v in data.get('vlans', []) if is_wg(v)]
wg_sorted = sorted(wg_vlans, key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0))
idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0)
return f'wg{idx}'
vid = vlan.get('vlan_id')
return lan if vid == 1 else f'{lan}.{vid}'
# ===================================================================
# Full config validation (shared with core.py --apply)
# ===================================================================
def validate_config(data):
"""Validate config.json structure and content. Returns list of error strings."""
errors = []
seen_vlan_ids = {}
seen_interfaces = {}
seen_names = {}
seen_listen_ports = {}
# Pre-compute per-VLAN vlan_ids and interface names without mutating data
_lan = data.get("network_interfaces", {}).get("lan_interface", "eth0")
_all_vlans = data.get("vlans", [])
_stored_ids = [_v.get("vlan_id") for _v in _all_vlans]
_wg_sorted = sorted(
[(i, _stored_ids[i]) for i, _v in enumerate(_all_vlans) if is_wg(_v)],
key=lambda x: (x[1] is None, x[1] or 0)
)
_wg_order = {orig_i: wg_idx for wg_idx, (orig_i, _) in enumerate(_wg_sorted)}
vlan_ifaces = []
for i, _vlan in enumerate(_all_vlans):
if is_wg(_vlan):
vlan_ifaces.append(f"wg{_wg_order[i]}")
else:
_vid = _stored_ids[i]
vlan_ifaces.append(_lan if _vid == 1 else f"{_lan}.{_vid}")
# upstream_dns block ============================================
if not data.get("upstream_dns", {}).get("upstream_servers"):
errors.append("upstream_dns.upstream_servers is missing or empty.")
# WAN / LAN interfaces ==========================================
gen = data.get("network_interfaces", {})
wan = gen.get("wan_interface", "")
lan = gen.get("lan_interface", "")
if not wan:
errors.append("network_interfaces.wan_interface is missing or empty.")
if not lan:
errors.append("network_interfaces.lan_interface is missing or empty.")
if wan and lan:
available_interfaces = set()
try:
available_interfaces = set(os.listdir('/sys/class/net'))
except Exception:
pass
if available_interfaces:
if wan not in available_interfaces:
errors.append(f"network_interfaces.wan_interface: '{wan}' does not exist on this system.")
if lan not in available_interfaces:
errors.append(f"network_interfaces.lan_interface: '{lan}' does not exist on this system.")
if wan == lan:
errors.append(f"network_interfaces.wan_interface and network_interfaces.lan_interface must be different (both set to '{wan}').")
# Blocklist library =============================================
blocklists_by_name = {}
for idx, bl in enumerate(data.get("dns_blocking", {}).get("blocklists", [])):
name = bl.get("name", "")
label = f"dns_blocking.blocklists[{idx}] '{name}'"
for field in ("name", "description", "save_as", "url", "format"):
if not bl.get(field):
errors.append(f"{label}: missing or empty field '{field}'.")
if bl.get("format") and bl["format"] not in VALID_BLOCKLIST_FORMATS:
errors.append(f"{label}: format must be one of: {', '.join(sorted(VALID_BLOCKLIST_FORMATS))}.")
if name:
if name in blocklists_by_name:
errors.append(f"{label}: duplicate blocklist name '{name}'.")
else:
blocklists_by_name[name] = bl
# Per-VLAN validation ===========================================
vlan_networks = {} # iface -> IPv4Network (used for NAT section)
for i, (vlan, iface) in enumerate(zip(_all_vlans, vlan_ifaces)):
vlan_id = _stored_ids[i]
name = vlan.get("name", "?")
label = f"vlan '{name}' (id={vlan_id})"
if vlan_id is None or not isinstance(vlan_id, int) or not (1 <= vlan_id <= 4094):
errors.append(f"vlan '{name}': vlan_id must be an integer 14094 (got {vlan_id!r}).")
if name in seen_names:
errors.append(f"{label}: duplicate vlan name '{name}' "
f"(also used by id={seen_names[name]}).")
else:
seen_names[name] = vlan_id
if vlan_id in seen_vlan_ids:
errors.append(f"{label}: duplicate vlan_id {vlan_id} "
f"(also used by '{seen_vlan_ids[vlan_id]}').")
else:
seen_vlan_ids[vlan_id] = name
if iface in seen_interfaces:
errors.append(f"{label}: duplicate interface '{iface}' "
f"(also used by '{seen_interfaces[iface]}').")
else:
seen_interfaces[iface] = name
if vlan.get("mdns_reflection") is True and is_wg(vlan):
errors.append(f"{label}: mdns_reflection must be false for WireGuard interfaces.")
if is_wg(vlan):
# vpn_information =======================================
vpi = vlan.get("vpn_information")
if not isinstance(vpi, dict):
errors.append(f"{label}: vpn_information must be a plain object.")
vpi = {}
else:
lp = vpi.get("listen_port")
if int_range(lp, 1, 65535) is None:
errors.append(f"{label}: vpn_information.listen_port must be an integer 1-65535.")
elif lp in seen_listen_ports:
errors.append(f"{label}: vpn_information.listen_port {lp} is already used by "
f"'{seen_listen_ports[lp]}'.")
else:
seen_listen_ports[lp] = name
# subnet/subnet_mask ====================================
for field in ("subnet", "subnet_mask"):
if not vlan.get(field):
errors.append(f"{label}: missing required field '{field}'.")
wg_net = None
if vlan.get("subnet") and vlan.get("subnet_mask"):
try:
wg_net = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
vlan_networks[iface] = wg_net
except ValueError as e:
errors.append(f"{label}: invalid subnet/subnet_mask: {e}")
# server_identities =====================================
if not vlan.get("server_identities"):
errors.append(f"{label}: server_identities is empty or missing.")
identity_ips = []
for idx, ident in enumerate(vlan.get("server_identities", [])):
ip_str = ident.get("ip", "")
ilabel = f"{label} server_identities[{idx}] '{ident.get('description', '?')}'"
if not ip_str:
errors.append(f"{ilabel}: missing 'ip' field.")
continue
if not ipv4(ip_str):
errors.append(f"{ilabel}: ip '{ip_str}' is not a valid IPv4 address.")
continue
ip_addr = ipaddress.IPv4Address(ip_str)
if wg_net and ip_addr not in wg_net:
errors.append(f"{ilabel}: ip '{ip_str}' is not within subnet {wg_net}.")
else:
identity_ips.append(ip_addr)
# vpn_information.explicit_overrides ====================
eo = vpi.get("explicit_overrides", {}) if isinstance(vpi, dict) else {}
if not isinstance(eo, dict):
errors.append(f"{label}: vpn_information.explicit_overrides must be a plain object.")
else:
gw = eo.get("gateway", "")
if gw:
if not ipv4(gw):
errors.append(f"{label}: vpn_information.explicit_overrides.gateway '{gw}' is not a valid IPv4 address.")
else:
gw_ip = ipaddress.IPv4Address(gw)
if identity_ips and gw_ip not in identity_ips:
errors.append(
f"{label}: vpn_information.explicit_overrides.gateway '{gw}' does not match "
f"any server_identity IP. Must be one of: "
f"{[str(ip) for ip in identity_ips]}."
)
dns = eo.get("dns_server", "")
if dns and not ipv4(dns):
errors.append(f"{label}: vpn_information.explicit_overrides.dns_server '{dns}' is not a valid IPv4 address.")
mtu = eo.get("mtu", "")
if mtu and int_range(mtu, 576, 9000) is None:
errors.append(f"{label}: vpn_information.explicit_overrides.mtu '{mtu}' must be an integer in range 576-9000.")
domain_val = vpi.get("domain", "") if isinstance(vpi, dict) else ""
if domain_val and not domainname(domain_val):
errors.append(f"{label}: vpn_information.domain '{domain_val}' is not a valid domain name.")
# peers =================================================
seen_peer_names = {}
seen_peer_ips = {}
for pidx, peer in enumerate(vlan.get("peers", [])):
pname = peer.get("name", "")
plabel = f"{label} peer[{pidx}] '{pname}'"
if not pname:
errors.append(f"{plabel}: missing 'name' field.")
elif pname in seen_peer_names:
errors.append(f"{plabel}: duplicate peer name '{pname}'.")
else:
seen_peer_names[pname] = pidx
if not peer.get("public_key"):
errors.append(f"{plabel}: missing 'public_key' field.")
pip_str = peer.get("ip", "")
if not pip_str:
errors.append(f"{plabel}: missing 'ip' field.")
elif not ipv4(pip_str):
errors.append(f"{plabel}: ip '{pip_str}' is not a valid IPv4 address.")
else:
pip = ipaddress.IPv4Address(pip_str)
if wg_net and pip not in wg_net:
errors.append(f"{plabel}: ip '{pip_str}' is not within subnet {wg_net}.")
if pip in identity_ips:
errors.append(f"{plabel}: ip '{pip_str}' conflicts with a server_identity.")
if pip_str in seen_peer_ips:
errors.append(
f"{plabel}: duplicate peer ip '{pip_str}' "
f"(also used by peer '{seen_peer_ips[pip_str]}')."
)
else:
seen_peer_ips[pip_str] = pname
continue
if not vlan.get("server_identities"):
errors.append(f"{label}: server_identities is empty or missing.")
continue
for field in ("subnet", "subnet_mask"):
if not vlan.get(field):
errors.append(f"{label}: missing required top-level field '{field}'.")
if not vlan.get("subnet") or not vlan.get("subnet_mask"):
continue
try:
network = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
vlan_networks[iface] = network
except ValueError as e:
errors.append(f"{label}: invalid subnet/subnet_mask: {e}")
continue
d = vlan.get("dhcp_information", {})
required_dhcp = {"dynamic_pool_start", "dynamic_pool_end", "lease_time"}
missing = required_dhcp - set(d.keys())
if missing:
errors.append(f"{label}: missing dhcp_information fields: {missing}")
continue
def check_ip(field_label, ip_str, allow_none=False):
if ip_str is None:
if not allow_none:
errors.append(f"{label}: {field_label} is null/missing.")
return None
if not ipv4(ip_str):
errors.append(f"{label}: {field_label} '{ip_str}' is not a valid IPv4 address.")
return None
addr = ipaddress.IPv4Address(ip_str)
if addr not in network:
errors.append(f"{label}: {field_label} '{ip_str}' is not within subnet {network}.")
return addr
identity_ips = []
for idx, ident in enumerate(vlan["server_identities"]):
ip = check_ip(
f"server_identities[{idx}] '{ident.get('description', '?')}'",
ident.get("ip")
)
if ip:
identity_ips.append(ip)
# Validate explicit_overrides ===============================
eo = d.get("explicit_overrides", {})
if not isinstance(eo, dict):
errors.append(f"{label}: explicit_overrides must be a plain object.")
else:
gw = eo.get("gateway", "")
if gw:
gw_ip = check_ip("explicit_overrides.gateway", gw)
if gw_ip and gw_ip not in identity_ips:
errors.append(
f"{label}: explicit_overrides.gateway '{gw}' does not match "
f"any server_identity IP. Must be one of: "
f"{[str(ip) for ip in identity_ips]}."
)
dns = eo.get("dns_server", "")
if dns:
check_ip("explicit_overrides.dns_server", dns)
ntp = eo.get("ntp_server", "")
if ntp:
check_ip("explicit_overrides.ntp_server", ntp)
pool_start = check_ip("dynamic_pool_start", d["dynamic_pool_start"])
pool_end = check_ip("dynamic_pool_end", d["dynamic_pool_end"])
if pool_start and pool_end and pool_start > pool_end:
errors.append(
f"{label}: dynamic_pool_start '{pool_start}' is greater than "
f"dynamic_pool_end '{pool_end}'."
)
if pool_start and pool_end:
for ip in identity_ips:
if pool_start <= ip <= pool_end:
errors.append(
f"{label}: server_identity '{ip}' falls inside the dynamic "
f"pool ({pool_start} - {pool_end})."
)
seen_res_ips = {}
seen_res_macs = {}
for r in vlan.get("reservations", []):
rdesc = r.get("description", "?")
rmac = r.get("mac", "").lower().strip()
if is_dynamic_ip(r):
rip = None
else:
rip = check_ip(f"reservation '{rdesc}' ip", r.get("ip"))
if rip:
if pool_start and pool_end and pool_start <= rip <= pool_end:
errors.append(
f"{label}: reservation '{rdesc}' ip '{rip}' falls inside "
f"the dynamic pool ({pool_start} - {pool_end})."
)
rip_str = str(rip)
if rip_str in seen_res_ips:
# Allow same IP for different MACs (multi-interface device)
if rmac and rmac in seen_res_ips[rip_str]:
errors.append(
f"{label}: reservation '{rdesc}' ip '{rip}' and MAC '{rmac}' "
f"duplicates '{seen_res_ips[rip_str][rmac]}'."
)
else:
seen_res_ips[rip_str][rmac] = rdesc
else:
seen_res_ips[rip_str] = {rmac: rdesc}
if rip in identity_ips:
errors.append(
f"{label}: reservation '{rdesc}' ip '{rip}' conflicts "
f"with a server_identity."
)
if rmac:
if rmac in seen_res_macs:
errors.append(
f"{label}: reservation '{rdesc}' MAC '{rmac}' duplicates "
f"'{seen_res_macs[rmac]}'."
)
else:
seen_res_macs[rmac] = rdesc
for bl_name in vlan.get("use_blocklists", []):
if bl_name not in blocklists_by_name:
errors.append(f"{label}: use_blocklists references unknown blocklist '{bl_name}'.")
# NAT / firewall validation =====================================
valid_protos = VALID_PROTOCOLS
known_interfaces = set(seen_interfaces.keys())
def nat_check_port(label, port):
if int_range(port, 1, 65535) is None:
errors.append(f"{label}: '{port}' is not a valid port number (1-65535).")
def nat_check_ip(label, ip_str):
if not ipv4(ip_str):
errors.append(f"{label}: '{ip_str}' is not a valid IPv4 address.")
return None
return ipaddress.IPv4Address(ip_str)
def nat_check_ip_in_network(label, ip_str, network):
ip = nat_check_ip(label, ip_str)
if ip and ip not in network:
errors.append(f"{label}: '{ip_str}' is not within subnet {network}.")
for vlan, iface in zip(data.get("vlans", []), vlan_ifaces):
name = vlan.get("name", "?")
net = vlan_networks.get(iface)
for r in vlan.get("port_wrangling", []):
desc = r.get("description", "?")
label = f"vlan '{name}' port_wrangling '{desc}'"
if r.get("protocol") not in valid_protos:
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
f"Must be tcp, udp, or both.")
nat_check_port(f"{label} dest_port", r.get("dest_port"))
if net:
nat_check_ip_in_network(f"{label} redirect_to", r.get("redirect_to", ""), net)
# port_forwarding validation (top-level) ========================
for idx, r in enumerate(data.get("port_forwarding", [])):
desc = r.get("description", "?")
label = f"port_forwarding[{idx}] '{desc}'"
if r.get("protocol") not in valid_protos:
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
f"Must be tcp, udp, or both.")
nat_check_port(f"{label} dest_port", r.get("dest_port"))
nat_check_port(f"{label} nat_port", r.get("nat_port"))
nat_check_ip(f"{label} nat_ip", r.get("nat_ip", ""))
for r in data.get("inter_vlan_exceptions", []):
desc = r.get("description", "?")
label = f"inter_vlan_exceptions '{desc}'"
if r.get("protocol") not in valid_protos:
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
f"Must be tcp, udp, or both.")
if "src_ip_or_subnet" not in r:
errors.append(f"{label}: missing field 'src_ip_or_subnet'.")
else:
val = r["src_ip_or_subnet"]
if not ipv4_or_cidr(val):
errors.append(f"{label}: src_ip_or_subnet '{val}' is not a valid "
f"IPv4 address or network.")
dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "")
if not dst:
errors.append(f"{label}: missing field 'dst_ip_or_subnet'.")
else:
if not ipv4_or_cidr(dst):
errors.append(f"{label}: dst_ip_or_subnet '{dst}' is not a valid "
f"IPv4 address or network.")
if r.get("dst_port") is not None:
nat_check_port(f"{label} dst_port", r.get("dst_port"))
# radius_default uniqueness check ===============================
defaults = [v["name"] for v in data.get("vlans", []) if v.get("radius_default") is True]
if len(defaults) > 1:
errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). "
f"Only one VLAN may be the RADIUS default.")
# RADIUS requires multiple VLANs ================================
non_wg_vlans = [v for v in data.get("vlans", []) if not is_wg(v)]
has_radius_clients = any(
r.get("radius_client")
for v in non_wg_vlans
for r in v.get("reservations", [])
)
if has_radius_clients and len(non_wg_vlans) < 2:
errors.append(
"RADIUS clients are configured but only one non-VPN VLAN exists. "
"Dynamic VLAN assignment requires at least two VLANs."
)
# host_overrides validation =====================================
all_vlan_nets = list(vlan_networks.values())
for idx, entry in enumerate(data.get("host_overrides", [])):
lbl = f"host_overrides[{idx}] '{entry.get('host', '?')}'"
if not entry.get("host"):
errors.append(f"{lbl}: missing 'host' field.")
ip_str = entry.get("ip", "")
if not ip_str:
errors.append(f"{lbl}: missing 'ip' field.")
elif not ipv4(ip_str):
errors.append(f"{lbl}: '{ip_str}' is not a valid IPv4 address.")
else:
ip_addr = ipaddress.IPv4Address(ip_str)
if all_vlan_nets and not any(ip_addr in net for net in all_vlan_nets):
errors.append(f"{lbl}: '{ip_str}' does not fall within any configured VLAN subnet.")
# banned_ips validation =========================================
for idx, entry in enumerate(data.get("banned_ips", [])):
ip_val = entry.get("ip", "")
lbl = f"banned_ips[{idx}] '{entry.get('description', '')}'"
if not ip_val:
errors.append(f"{lbl}: missing 'ip' field.")
continue
if not banned_ip(ip_val):
errors.append(f"{lbl}: '{ip_val}' is not a valid IP, CIDR, or wildcard pattern.")
return errors