Development

This commit is contained in:
Matthew Grotke 2026-05-25 16:07:21 -04:00
parent b63aed53fc
commit 6221ee3691
12 changed files with 511 additions and 245 deletions

View file

@ -1,10 +1,11 @@
import base64
import copy
import ipaddress
import re
from flask import Blueprint, make_response, redirect, flash, request
from auth import require_level
from config_utils import load_core, save_core, verify_core_hash, queued_msg, CONFIGS_DIR, WEB_APP_DISPLAY_NAME
from config_utils import load_core, save_core_with_snapshot, verify_core_hash, CONFIGS_DIR, WEB_APP_DISPLAY_NAME
import sanitize
import validation as validate
@ -24,7 +25,6 @@ def _wg_vlan_by_name(core, name):
def _find_peer_by_flat_idx(core, flat_idx):
"""Return (vlan, peer_list_index) by flat index across all VPN VLANs in order."""
i = 0
for vlan in core.get('vlans', []):
if not vlan.get('is_vpn'):
@ -38,7 +38,6 @@ def _find_peer_by_flat_idx(core, flat_idx):
def _wg_iface(vlan, core):
"""Return the WireGuard interface name (wg0, wg1, ...) for a VPN VLAN."""
wg_vlans = [v for v in core.get('vlans', []) if v.get('is_vpn')]
idx = next((i for i, v in enumerate(wg_vlans) if v is vlan), 0)
return f'wg{idx}'
@ -59,7 +58,6 @@ def _hash_ok():
def _generate_wg_keypair():
"""Generate an X25519 keypair compatible with WireGuard. Returns (private_b64, public_b64)."""
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from cryptography.hazmat.primitives.serialization import (
Encoding, PublicFormat, PrivateFormat, NoEncryption,
@ -71,7 +69,6 @@ def _generate_wg_keypair():
def _server_pubkey(iface):
"""Read the server public key written by core.py --apply."""
try:
with open(f'{CONFIGS_DIR}/.{iface}.pub') as f:
return f.read().strip()
@ -80,7 +77,6 @@ def _server_pubkey(iface):
def _build_client_conf(vlan, peer_name, peer_ip, private_key, server_pubkey):
"""Build WireGuard client .conf content."""
info = vlan.get('vpn_information', {})
overrides = info.get('explicit_overrides', {})
subnet = vlan.get('subnet', '')
@ -89,22 +85,21 @@ def _build_client_conf(vlan, peer_name, peer_ip, private_key, server_pubkey):
ident_ips = [s['ip'] for s in vlan.get('server_identities', []) if s.get('ip')]
default = str(min((ipaddress.IPv4Address(ip) for ip in ident_ips),
key=lambda x: x.packed[-1])) if ident_ips else str(next(network.hosts()))
gateway = overrides.get('gateway') or default
dns = overrides.get('dns_server') or gateway
prefix = network.prefixlen
mtu = overrides.get('mtu', '')
endpoint = info.get('server_endpoint', '')
gateway = overrides.get('gateway') or default
dns = overrides.get('dns_server') or gateway
prefix = network.prefixlen
mtu = overrides.get('mtu', '')
endpoint = info.get('server_endpoint', '')
listen_port = info.get('listen_port', 51820)
split_tunnel = next(
(p.get('split_tunnel', False) for p in vlan.get('peers', []) if p.get('name') == peer_name),
False
False,
)
allowed_ips = f'{subnet}/{prefix}' if split_tunnel else '0.0.0.0/0'
lines = [
f'# Generated by {WEB_APP_DISPLAY_NAME}',
'',
f'# Generated by {WEB_APP_DISPLAY_NAME}', '',
'[Interface]',
f'PrivateKey = {private_key}',
f'Address = {peer_ip}/{prefix}',
@ -120,9 +115,8 @@ def _build_client_conf(vlan, peer_name, peer_ip, private_key, server_pubkey):
def _conf_response(vlan, peer_name, peer_ip, private_key):
"""Return a .conf file download response, or redirect with error if pubkey unavailable."""
core = load_core()
iface = _wg_iface(vlan, core)
core = load_core()
iface = _wg_iface(vlan, core)
server_pub = _server_pubkey(iface)
if not server_pub:
flash('Peer saved. Run sudo python3 ~/routlin/core.py --apply to generate the server '
@ -170,7 +164,7 @@ def apply_vpn():
if not _hash_ok():
return redirect(_VIEW)
core = load_core()
core = load_core()
vpn_vlan = _wg_vlan(core)
if vpn_vlan is None:
flash('No WireGuard VLAN found in configuration.', 'error')
@ -181,7 +175,8 @@ def apply_vpn():
flash(f'Listen port {listen_port} is already used by another VPN VLAN.', 'error')
return redirect(_VIEW)
info = vpn_vlan.setdefault('vpn_information', {})
before_info = copy.deepcopy(vpn_vlan.get('vpn_information', {}))
info = vpn_vlan.setdefault('vpn_information', {})
info['listen_port'] = listen_port
info['server_endpoint'] = server_endpoint
info['domain'] = domain
@ -201,8 +196,14 @@ def apply_vpn():
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
save_core(core)
flash(queued_msg('core apply'), 'success')
vlan_name = vpn_vlan['name']
flash(save_core_with_snapshot(
core,
path=f'vlans.{vlan_name}.vpn_information', key=vlan_name, operation='edit',
before=before_info or None, after=copy.deepcopy(info),
description=f'Updated VPN configuration for {vlan_name}',
), 'success')
return redirect(_VIEW)
@ -225,11 +226,10 @@ def add_vpn_peer():
if not peer_ip:
flash(f'"{peer_ip_raw}" is not a valid IP address.', 'error')
return redirect(_VIEW)
if not _hash_ok():
return redirect(_VIEW)
core = load_core()
core = load_core()
vpn_vlan = _wg_vlan_by_name(core, peer_vlan_nm)
if vpn_vlan is None:
flash(f'VPN VLAN "{peer_vlan_nm}" not found.', 'error')
@ -255,20 +255,27 @@ def add_vpn_peer():
return redirect(_VIEW)
private_key, public_key = _generate_wg_keypair()
peers.append({
entry = {
'name': peer_name,
'ip': peer_ip,
'public_key': public_key,
'split_tunnel': split_tunnel,
'enabled': enabled,
})
}
peers.append(entry)
errors = validate.validate_config(core)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
save_core(core)
save_core_with_snapshot(
core,
path=f'vlans.{peer_vlan_nm}.peers', key=peer_name, operation='add',
before=None, after={k: v for k, v in entry.items() if k != 'public_key'},
description=f'Added VPN peer: {peer_name} ({peer_ip})',
queue=True,
)
return _conf_response(vpn_vlan, peer_name, peer_ip, private_key)
@ -301,14 +308,21 @@ def edit_vpn_peer():
flash(f'A peer named "{peer_name}" already exists.', 'error')
return redirect(_VIEW)
before = copy.deepcopy({k: peers[peer_idx].get(k) for k in ('name', 'split_tunnel', 'enabled')})
peers[peer_idx].update({'name': peer_name, 'split_tunnel': split_tunnel, 'enabled': enabled})
errors = validate.validate_config(core)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
save_core(core)
flash(queued_msg('core apply'), 'success')
vlan_name = vlan['name']
flash(save_core_with_snapshot(
core,
path=f'vlans.{vlan_name}.peers', key=peer_name, operation='edit',
before=before, after={'name': peer_name, 'split_tunnel': split_tunnel, 'enabled': enabled},
description=f'Edited VPN peer: {peer_name}',
), 'success')
return redirect(_VIEW)
@ -328,15 +342,24 @@ def toggle_vpn_peer():
flash('Peer not found.', 'error')
return redirect(_VIEW)
peers = vlan.get('peers', [])
peers[peer_idx]['enabled'] = not peers[peer_idx].get('enabled', True)
peers = vlan.get('peers', [])
old_enabled = peers[peer_idx].get('enabled', True)
peers[peer_idx]['enabled'] = not old_enabled
errors = validate.validate_config(core)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
save_core(core)
flash(queued_msg('core apply'), 'success')
peer_name = peers[peer_idx]['name']
vlan_name = vlan['name']
action = 'Enabled' if not old_enabled else 'Disabled'
flash(save_core_with_snapshot(
core,
path=f'vlans.{vlan_name}.peers', key=peer_name, operation='toggle',
before={'enabled': old_enabled}, after={'enabled': not old_enabled},
description=f'{action} VPN peer: {peer_name}',
), 'success')
return redirect(_VIEW)
@ -356,14 +379,22 @@ def delete_vpn_peer():
flash('Peer not found.', 'error')
return redirect(_VIEW)
vlan.get('peers', []).pop(peer_idx)
peers = vlan.get('peers', [])
removed = peers.pop(peer_idx)
errors = validate.validate_config(core)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
save_core(core)
flash(queued_msg('core apply'), 'success')
vlan_name = vlan['name']
flash(save_core_with_snapshot(
core,
path=f'vlans.{vlan_name}.peers', key=removed['name'], operation='delete',
before={k: removed.get(k) for k in ('name', 'ip', 'split_tunnel', 'enabled')},
after=None,
description=f'Deleted VPN peer: {removed["name"]}',
), 'success')
return redirect(_VIEW)
@ -384,13 +415,21 @@ def regenerate_vpn_peer():
return redirect(_VIEW)
private_key, public_key = _generate_wg_keypair()
peer = vlan['peers'][peer_idx]
peer = vlan['peers'][peer_idx]
old_pub_key = peer.get('public_key', '')
peer['public_key'] = public_key
errors = validate.validate_config(core)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(_VIEW)
save_core(core)
vlan_name = vlan['name']
save_core_with_snapshot(
core,
path=f'vlans.{vlan_name}.peers', key=peer['name'], operation='regenerate',
before={'public_key': old_pub_key}, after={'public_key': public_key},
description=f'Regenerated keypair for VPN peer: {peer["name"]}',
queue=True,
)
return _conf_response(vlan, peer['name'], peer['ip'], private_key)