UI improvements and input validations

This commit is contained in:
Matthew Grotke 2026-05-20 04:06:50 -04:00
parent b8c4914a52
commit 270856b391
22 changed files with 1548 additions and 302 deletions

View file

@ -4,7 +4,7 @@ import re
from flask import Blueprint, make_response, redirect, flash, request
from auth import require_level
from config_utils import load_core, save_core, verify_core_hash, apply_msg, CONFIGS_DIR
from config_utils import load_core, save_core, verify_core_hash, queued_msg, CONFIGS_DIR
import sanitize
import validate
@ -19,6 +19,24 @@ def _wg_vlan(core):
return next((v for v in core.get('vlans', []) if v.get('is_vpn')), None)
def _wg_vlan_by_name(core, name):
return next((v for v in core.get('vlans', []) if v.get('is_vpn') and v.get('name') == name), None)
def _find_peer_by_flat_idx(core, flat_idx):
"""Return (vlan, peer_list_index) by flat index across all VPN VLANs in order."""
i = 0
for vlan in core.get('vlans', []):
if not vlan.get('is_vpn'):
continue
peers = vlan.get('peers', [])
for j in range(len(peers)):
if i == flat_idx:
return vlan, j
i += 1
return None, None
def _wg_iface(vlan, core):
"""Return the WireGuard interface name (wg0, wg1, ...) for a VPN VLAN."""
wg_vlans = [v for v in core.get('vlans', []) if v.get('is_vpn')]
@ -180,7 +198,7 @@ def apply_vpn():
overrides.pop('mtu', None)
save_core(core)
flash(apply_msg(), 'success')
flash(queued_msg('core apply'), 'success')
return redirect(_VIEW)
@ -188,12 +206,17 @@ def apply_vpn():
@require_level('administrator')
def add_vpn_peer():
peer_name = sanitize.name(request.form.get('peer_name', ''))
peer_vlan_nm = request.form.get('peer_vlan', '').strip()
peer_ip_raw = request.form.get('peer_ip', '').strip()
split_tunnel = 'split_tunnel' in request.form
enabled = 'enabled' in request.form
if not peer_name:
flash('Peer name is required.', 'error')
return redirect(_VIEW)
if not peer_vlan_nm:
flash('Assigned VLAN is required.', 'error')
return redirect(_VIEW)
peer_ip = validate.ip(peer_ip_raw)
if not peer_ip:
flash(f'"{peer_ip_raw}" is not a valid IP address.', 'error')
@ -203,18 +226,29 @@ def add_vpn_peer():
return redirect(_VIEW)
core = load_core()
vpn_vlan = _wg_vlan(core)
vpn_vlan = _wg_vlan_by_name(core, peer_vlan_nm)
if vpn_vlan is None:
flash('No WireGuard VLAN found in configuration.', 'error')
flash(f'VPN VLAN "{peer_vlan_nm}" not found.', 'error')
return redirect(_VIEW)
try:
network = ipaddress.IPv4Network(f"{vpn_vlan['subnet']}/{vpn_vlan['subnet_mask']}", strict=False)
if ipaddress.IPv4Address(peer_ip) not in network:
flash(f'{peer_ip} is not within the subnet {vpn_vlan["subnet"]}/{vpn_vlan["subnet_mask"]} of {peer_vlan_nm}.', 'error')
return redirect(_VIEW)
except Exception:
pass
peers = vpn_vlan.setdefault('peers', [])
if any(p.get('name') == peer_name for p in peers):
flash(f'A peer named "{peer_name}" already exists.', 'error')
return redirect(_VIEW)
if any(p.get('ip') == peer_ip for p in peers):
flash(f'IP address {peer_ip} is already assigned to another peer.', 'error')
return redirect(_VIEW)
for v in core.get('vlans', []):
if not v.get('is_vpn'):
continue
if any(p.get('ip') == peer_ip for p in v.get('peers', [])):
flash(f'IP address {peer_ip} is already assigned to another peer.', 'error')
return redirect(_VIEW)
private_key, public_key = _generate_wg_keypair()
peers.append({
@ -222,7 +256,7 @@ def add_vpn_peer():
'ip': peer_ip,
'public_key': public_key,
'split_tunnel': split_tunnel,
'enabled': True,
'enabled': enabled,
})
save_core(core)
@ -232,8 +266,8 @@ def add_vpn_peer():
@bp.route('/action/edit_vpn_peer', methods=['POST'])
@require_level('administrator')
def edit_vpn_peer():
idx = _row_index()
if idx is None:
flat_idx = _row_index()
if flat_idx is None:
flash('Invalid request.', 'error')
return redirect(_VIEW)
@ -248,105 +282,86 @@ def edit_vpn_peer():
return redirect(_VIEW)
core = load_core()
vpn_vlan = _wg_vlan(core)
if vpn_vlan is None:
flash('No WireGuard VLAN found.', 'error')
return redirect(_VIEW)
peers = vpn_vlan.get('peers', [])
if idx < 0 or idx >= len(peers):
vlan, peer_idx = _find_peer_by_flat_idx(core, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
return redirect(_VIEW)
# Reject duplicate name if it belongs to a different peer
if any(i != idx and p.get('name') == peer_name for i, p in enumerate(peers)):
peers = vlan.get('peers', [])
if any(j != peer_idx and p.get('name') == peer_name for j, p in enumerate(peers)):
flash(f'A peer named "{peer_name}" already exists.', 'error')
return redirect(_VIEW)
peers[idx].update({'name': peer_name, 'split_tunnel': split_tunnel, 'enabled': enabled})
peers[peer_idx].update({'name': peer_name, 'split_tunnel': split_tunnel, 'enabled': enabled})
save_core(core)
flash(apply_msg(), 'success')
flash(queued_msg('core apply'), 'success')
return redirect(_VIEW)
@bp.route('/action/toggle_vpn_peer', methods=['POST'])
@require_level('administrator')
def toggle_vpn_peer():
idx = _row_index()
if idx is None:
flat_idx = _row_index()
if flat_idx is None:
flash('Invalid request.', 'error')
return redirect(_VIEW)
if not _hash_ok():
return redirect(_VIEW)
core = load_core()
vpn_vlan = _wg_vlan(core)
if vpn_vlan is None:
flash('No WireGuard VLAN found.', 'error')
return redirect(_VIEW)
peers = vpn_vlan.get('peers', [])
if idx < 0 or idx >= len(peers):
vlan, peer_idx = _find_peer_by_flat_idx(core, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
return redirect(_VIEW)
peers[idx]['enabled'] = not peers[idx].get('enabled', True)
peers = vlan.get('peers', [])
peers[peer_idx]['enabled'] = not peers[peer_idx].get('enabled', True)
save_core(core)
flash(apply_msg(), 'success')
flash(queued_msg('core apply'), 'success')
return redirect(_VIEW)
@bp.route('/action/delete_vpn_peer', methods=['POST'])
@require_level('administrator')
def delete_vpn_peer():
idx = _row_index()
if idx is None:
flat_idx = _row_index()
if flat_idx is None:
flash('Invalid request.', 'error')
return redirect(_VIEW)
if not _hash_ok():
return redirect(_VIEW)
core = load_core()
vpn_vlan = _wg_vlan(core)
if vpn_vlan is None:
flash('No WireGuard VLAN found.', 'error')
return redirect(_VIEW)
peers = vpn_vlan.get('peers', [])
if idx < 0 or idx >= len(peers):
vlan, peer_idx = _find_peer_by_flat_idx(core, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
return redirect(_VIEW)
peers.pop(idx)
vlan.get('peers', []).pop(peer_idx)
save_core(core)
flash(apply_msg(), 'success')
flash(queued_msg('core apply'), 'success')
return redirect(_VIEW)
@bp.route('/action/regenerate_vpn_peer', methods=['POST'])
@require_level('administrator')
def regenerate_vpn_peer():
idx = _row_index()
if idx is None:
flat_idx = _row_index()
if flat_idx is None:
flash('Invalid request.', 'error')
return redirect(_VIEW)
if not _hash_ok():
return redirect(_VIEW)
core = load_core()
vpn_vlan = _wg_vlan(core)
if vpn_vlan is None:
flash('No WireGuard VLAN found.', 'error')
return redirect(_VIEW)
peers = vpn_vlan.get('peers', [])
if idx < 0 or idx >= len(peers):
vlan, peer_idx = _find_peer_by_flat_idx(core, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
return redirect(_VIEW)
private_key, public_key = _generate_wg_keypair()
peer = peers[idx]
peer = vlan['peers'][peer_idx]
peer['public_key'] = public_key
save_core(core)
return _conf_response(vpn_vlan, peer['name'], peer['ip'], private_key)
return _conf_response(vlan, peer['name'], peer['ip'], private_key)