Development

This commit is contained in:
Matthew Grotke 2026-05-27 22:04:04 -04:00
parent eed1d295dc
commit d9f3bd8289
45 changed files with 635 additions and 666 deletions

View file

@ -1,3 +1,4 @@
from pathlib import Path
import base64
import copy
import ipaddress
@ -9,9 +10,10 @@ from config_utils import load_config, save_config_with_snapshot, verify_config_h
import sanitize
import validation as validate
bp = Blueprint('vpn', __name__)
_PAGE = Path(__file__).parent.name
bp = Blueprint(_PAGE, __name__)
_VIEW = '/view/view_vpn'
_MTU_MIN = 576
_MTU_MAX = 9000
@ -121,7 +123,7 @@ def _conf_response(vlan, peer_name, peer_ip, private_key):
if not server_pub:
flash('Peer saved. Run sudo python3 ~/routlin/core.py --apply to generate the server '
'public key, then regenerate this peer to download the client config.', 'warning')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
conf = _build_client_conf(vlan, peer_name, peer_ip, private_key, server_pub)
safe = re.sub(r'[^A-Za-z0-9_\-]', '_', peer_name)
resp = make_response(conf)
@ -130,9 +132,9 @@ def _conf_response(vlan, peer_name, peer_ip, private_key):
return resp
@bp.route('/action/apply_vpn', methods=['POST'])
@bp.route('/action/vpn/wireguard_apply', methods=['POST'])
@require_level('administrator')
def apply_vpn():
def wireguard_apply():
listen_port_raw = request.form.get('vpn_listen_port', '').strip()
server_endpoint = validate.domainname(request.form.get('vpn_server_endpoint', ''))
domain = validate.domainname(request.form.get('vpn_domain', ''))
@ -141,39 +143,39 @@ def apply_vpn():
if not listen_port_raw:
flash('Listen port is required.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
listen_port = validate.int_range(listen_port_raw, 1, 65535)
if listen_port is None:
flash(f'"{listen_port_raw}" is not a valid port number (1-65535).', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
dns_server = ''
if dns_raw:
dns_server = validate.ip(dns_raw)
if not dns_server:
flash(f'"{dns_raw}" is not a valid IP address for DNS server.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
mtu = None
if mtu_raw:
mtu = validate.int_range(mtu_raw, _MTU_MIN, _MTU_MAX)
if mtu is None:
flash(f'"{mtu_raw}" is not a valid MTU (must be {_MTU_MIN}-{_MTU_MAX}).', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
cfg = load_config()
vpn_vlan = _wg_vlan(cfg)
if vpn_vlan is None:
flash('No WireGuard VLAN found in configuration.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
for v in cfg.get('vlans', []):
if v.get('is_vpn') and v is not vpn_vlan and v.get('vpn_information', {}).get('listen_port') == listen_port:
flash(f'Listen port {listen_port} is already used by another VPN VLAN.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
before_info = copy.deepcopy(vpn_vlan.get('vpn_information', {}))
info = vpn_vlan.setdefault('vpn_information', {})
@ -195,7 +197,7 @@ def apply_vpn():
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
vlan_name = vpn_vlan['name']
flash(save_config_with_snapshot(
@ -204,12 +206,12 @@ def apply_vpn():
before=before_info or None, after=copy.deepcopy(info),
description=f'Updated VPN configuration for {vlan_name}',
), 'success')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
@bp.route('/action/add_vpn_peer', methods=['POST'])
@bp.route('/action/vpn/addpeer_add', methods=['POST'])
@require_level('administrator')
def add_vpn_peer():
def addpeer_add():
peer_name = sanitize.name(request.form.get('peer_name', ''))
peer_vlan_nm = request.form.get('peer_vlan', '').strip()
peer_ip_raw = request.form.get('peer_ip', '').strip()
@ -218,41 +220,41 @@ def add_vpn_peer():
if not peer_name:
flash('Peer name is required.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
if not peer_vlan_nm:
flash('Assigned VLAN is required.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
peer_ip = validate.ip(peer_ip_raw)
if not peer_ip:
flash(f'"{peer_ip_raw}" is not a valid IP address.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
cfg = load_config()
vpn_vlan = _wg_vlan_by_name(cfg, peer_vlan_nm)
if vpn_vlan is None:
flash(f'VPN VLAN "{peer_vlan_nm}" not found.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
try:
network = ipaddress.IPv4Network(f"{vpn_vlan['subnet']}/{vpn_vlan['subnet_mask']}", strict=False)
if ipaddress.IPv4Address(peer_ip) not in network:
flash(f'{peer_ip} is not within the subnet {vpn_vlan["subnet"]}/{vpn_vlan["subnet_mask"]} of {peer_vlan_nm}.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
except Exception:
pass
peers = vpn_vlan.setdefault('peers', [])
if any(p.get('name') == peer_name for p in peers):
flash(f'A peer named "{peer_name}" already exists.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
for v in cfg.get('vlans', []):
if not v.get('is_vpn'):
continue
if any(p.get('ip') == peer_ip for p in v.get('peers', [])):
flash(f'IP address {peer_ip} is already assigned to another peer.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
private_key, public_key = _generate_wg_keypair()
entry = {
@ -267,7 +269,7 @@ def add_vpn_peer():
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
save_config_with_snapshot(
cfg,
@ -279,13 +281,13 @@ def add_vpn_peer():
return _conf_response(vpn_vlan, peer_name, peer_ip, private_key)
@bp.route('/action/edit_vpn_peer', methods=['POST'])
@bp.route('/action/vpn/peers_edit', methods=['POST'])
@require_level('administrator')
def edit_vpn_peer():
def peers_edit():
flat_idx = _row_index()
if flat_idx is None:
flash('Invalid request.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
peer_name = sanitize.name(request.form.get('name', ''))
split_tunnel = request.form.get('split_tunnel') in ('true', '1', 'on', 'yes')
@ -293,20 +295,20 @@ def edit_vpn_peer():
if not peer_name:
flash('Peer name is required.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
cfg = load_config()
vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
peers = vlan.get('peers', [])
if any(j != peer_idx and p.get('name') == peer_name for j, p in enumerate(peers)):
flash(f'A peer named "{peer_name}" already exists.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
before = copy.deepcopy({k: peers[peer_idx].get(k) for k in ('name', 'split_tunnel', 'enabled')})
peers[peer_idx].update({'name': peer_name, 'split_tunnel': split_tunnel, 'enabled': enabled})
@ -314,7 +316,7 @@ def edit_vpn_peer():
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
vlan_name = vlan['name']
flash(save_config_with_snapshot(
@ -323,24 +325,24 @@ def edit_vpn_peer():
before=before, after={'name': peer_name, 'split_tunnel': split_tunnel, 'enabled': enabled},
description=f'Edited VPN peer: {peer_name}',
), 'success')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
@bp.route('/action/toggle_vpn_peer', methods=['POST'])
@bp.route('/action/vpn/peers_toggle', methods=['POST'])
@require_level('administrator')
def toggle_vpn_peer():
def peers_toggle():
flat_idx = _row_index()
if flat_idx is None:
flash('Invalid request.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
cfg = load_config()
vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
peers = vlan.get('peers', [])
old_enabled = peers[peer_idx].get('enabled', True)
@ -349,7 +351,7 @@ def toggle_vpn_peer():
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
peer_name = peers[peer_idx]['name']
vlan_name = vlan['name']
@ -360,24 +362,24 @@ def toggle_vpn_peer():
before={'enabled': old_enabled}, after={'enabled': not old_enabled},
description=f'{action} VPN peer: {peer_name}',
), 'success')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
@bp.route('/action/delete_vpn_peer', methods=['POST'])
@bp.route('/action/vpn/peers_delete', methods=['POST'])
@require_level('administrator')
def delete_vpn_peer():
def peers_delete():
flat_idx = _row_index()
if flat_idx is None:
flash('Invalid request.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
cfg = load_config()
vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
peers = vlan.get('peers', [])
removed = peers.pop(peer_idx)
@ -385,7 +387,7 @@ def delete_vpn_peer():
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
vlan_name = vlan['name']
flash(save_config_with_snapshot(
@ -395,24 +397,24 @@ def delete_vpn_peer():
after=None,
description=f'Deleted VPN peer: {removed["name"]}',
), 'success')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
@bp.route('/action/regenerate_vpn_peer', methods=['POST'])
@bp.route('/action/vpn/peers_regenerate', methods=['POST'])
@require_level('administrator')
def regenerate_vpn_peer():
def peers_regenerate():
flat_idx = _row_index()
if flat_idx is None:
flash('Invalid request.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
cfg = load_config()
vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
private_key, public_key = _generate_wg_keypair()
peer = vlan['peers'][peer_idx]
@ -422,7 +424,7 @@ def regenerate_vpn_peer():
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
return redirect(f'/{_PAGE}')
vlan_name = vlan['name']
save_config_with_snapshot(