Development

This commit is contained in:
Matthew Grotke 2026-06-02 12:49:39 -04:00
parent 59ac3c5973
commit 3d0dc265ba
31 changed files with 1093 additions and 2794 deletions

View file

@ -1,9 +1,10 @@
import json
import sanitize
from config_utils import collect_layout_tokens
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
blank = [{'value': '', 'label': '-- Select timezone --'}]
return {
'TIMEZONE_OPTIONS': json.dumps(blank + [{'value': tz, 'label': tz} for tz in sanitize.VALID_TIMEZONES]),
}
tokens['TIMEZONE_OPTIONS'] = json.dumps(blank + [{'value': tz, 'label': tz} for tz in sanitize.VALID_TIMEZONES])
return tokens

View file

@ -0,0 +1,5 @@
from config_utils import collect_layout_tokens
def collect_tokens(cfg):
return collect_layout_tokens(cfg)

View file

@ -1,11 +1,17 @@
import json
from config_utils import collect_layout_tokens, load_datasource
from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
def collect_tokens(cfg):
return {
'ACCOUNT_LEVEL_OPTIONS': json.dumps([
{'value': 'viewer', 'label': 'Viewer (read-only access to live data)'},
{'value': 'administrator', 'label': 'Administrator (can modify configuration)'},
{'value': 'manager', 'label': 'Manager (full access including account management)'},
]),
}
tokens = collect_layout_tokens(cfg)
tokens['ACCOUNT_LEVEL_OPTIONS'] = json.dumps([
{'value': 'viewer', 'label': 'Viewer (read-only access to live data)'},
{'value': 'administrator', 'label': 'Administrator (can modify configuration)'},
{'value': 'manager', 'label': 'Manager (full access including account management)'},
])
content = load_json(f'{PAGES_DIR}/accountmanage/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
return tokens

View file

@ -0,0 +1,5 @@
from config_utils import collect_layout_tokens
def collect_tokens(cfg):
return collect_layout_tokens(cfg)

View file

@ -3,15 +3,14 @@ from collections import defaultdict
from datetime import datetime
from flask import session
from config_utils import (
get_dashboard_pending, load_all_groups, get_done_timestamps,
collect_layout_tokens, get_dashboard_pending, load_all_groups, get_done_timestamps,
_apply_changes_immediately, _find_cmd_in_queues, WEB_APP_DISPLAY_NAME,
)
from factory import LEVEL_RANK, e, client_level, build_snap_val, snap_expand_row
from view_common import _load_icon
from factory import LEVEL_RANK, e, client_level, build_snap_val, snap_expand_row, load_icon
def collect_tokens(cfg):
tokens = {}
tokens = collect_layout_tokens(cfg)
tokens['GENERAL_APPLY_ON_SAVE'] = 'true' if session.get('apply_changes_immediately', False) else 'false'
all_groups = load_all_groups()
@ -60,7 +59,7 @@ def collect_tokens(cfg):
tokens['NO_PENDING'] = 'true' if not pending_items else ''
tokens['NO_DISMISSIBLE_PENDING'] = 'true' if not any(c != 'fix problems' for _, _, c, _ in pending_items) else ''
tokens['APPLY_WARNING'] = (
f'<span style="color:var(--warning)"><p>{_load_icon("arrow-left")} <strong>Applying actions will briefly disrupt connections as network services are restarted.</strong></p></span>'
f'<span style="color:var(--warning)"><p>{load_icon("arrow-left")} <strong>Applying actions will briefly disrupt connections as network services are restarted.</strong></p></span>'
if pending_items else ''
)

View file

@ -0,0 +1,11 @@
from config_utils import collect_layout_tokens, load_datasource
from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
content = load_json(f'{PAGES_DIR}/bannedips/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
return tokens

View file

@ -1,9 +1,13 @@
import json
import re
import os
from view_common import load_ddns, public_ip_info, ddns_last_checked, CONFIGS_DIR
from config_utils import (
collect_layout_tokens, load_datasource, CONFIGS_DIR, relative_time,
)
from factory import load_ddns, load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
import validation as validate
DDNS_LOG_MAX = 50
@ -42,8 +46,47 @@ def _ddns_log_tail():
return '(error reading log)', ''
def _read_cached_ip():
try:
best_ip, best_mtime = '', 0.0
for fname in os.listdir(CONFIGS_DIR):
if fname.startswith('.ddns-last-ip-'):
path = f'{CONFIGS_DIR}/{fname}'
mtime = os.path.getmtime(path)
if mtime > best_mtime:
ip = open(path).read().strip()
if ip:
best_ip, best_mtime = ip, mtime
return best_ip, (best_mtime if best_ip else None)
except Exception:
return '', None
def public_ip_info(ddns_cfg):
from datetime import datetime, timezone
enabled_p = [p for p in ddns_cfg.get('providers', []) if p.get('enabled', True)]
all_hosts = []
for p in enabled_p:
all_hosts.extend(p.get('hostnames', p.get('subdomains', [])))
domains_sub = ', '.join(all_hosts)
ip, mtime = _read_cached_ip()
last_obtained = f'Obtained: {relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago' if mtime else ''
if ip:
return ip, domains_sub, last_obtained
return 'Offline', domains_sub, ''
def ddns_last_checked():
from datetime import datetime, timezone
try:
mtime = os.path.getmtime(f'{CONFIGS_DIR}/.ddns-last-service')
return f'Last checked: {relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago'
except OSError:
return 'Last checked: ---'
def collect_tokens(cfg):
tokens = {}
tokens = collect_layout_tokens(cfg)
ddns = load_ddns()
ddns_gen = ddns.get('general', {})
tokens['DDNS_TIMER_INTERVAL'] = ddns_gen.get('timer_interval', '-')
@ -68,4 +111,8 @@ def collect_tokens(cfg):
tokens['STAT_PUBLIC_IP_LAST_OBTAINED'] = last_obtained
tokens['STAT_PUBLIC_IP_LAST_CHECKED'] = ddns_last_checked()
tokens['DDNS_LOG_TAIL'], tokens['DDNS_LOG_SUMMARY'] = _ddns_log_tail()
content = load_json(f'{PAGES_DIR}/ddns/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
return tokens

View file

@ -1,237 +0,0 @@
from pathlib import Path
import copy
import ipaddress
from flask import Blueprint, request, redirect, flash
from auth import require_level
from config_utils import load_config, record_group, diff_fields, verify_config_hash
import sanitize
import validation as validate
_PAGE = Path(__file__).parent.name
bp = Blueprint(_PAGE, __name__)
def _row_index():
try:
return int(request.form.get('row_index', ''))
except (ValueError, TypeError):
return None
def _hash_ok():
if not verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return False
return True
def _parse_ip():
raw = request.form.get('ip', '').strip()
if not raw:
return 'dynamic'
ip = validate.ip(raw)
if not ip:
flash(f'The configuration has not been saved because "{raw}" is not a valid IP address.', 'error')
return None
return ip
def _check_ip_in_vlan_subnet(ip, vlan):
if not ip or ip == 'dynamic':
return None
subnet = vlan.get('subnet')
prefix = vlan.get('subnet_mask')
if not subnet or prefix is None:
return None
try:
network = ipaddress.IPv4Network(f'{subnet}/{prefix}', strict=False)
addr = ipaddress.IPv4Address(ip)
if addr == network.network_address:
return f'{ip} is the network address and cannot be assigned.'
if addr == network.broadcast_address:
return f'{ip} is the broadcast address and cannot be assigned.'
if addr not in network:
return f'{ip} is not within the {vlan["name"]} subnet ({subnet}/{prefix}).'
except ValueError:
return f'{ip} is not a valid IP address.'
return None
@bp.route('/action/dhcp/addreservation_add', methods=['POST'])
@require_level('administrator')
def addreservation_add():
vlan_name = sanitize.name(request.form.get('vlan_name', ''))
description = sanitize.text(request.form.get('description', ''))
hostname = validate.domainname(request.form.get('hostname', ''))
mac = sanitize.mac(request.form.get('mac', ''))
ip = _parse_ip()
radius_client = 'radius_client' in request.form
if ip is None:
return redirect(f'/{_PAGE}')
if not vlan_name:
flash('The configuration has not been saved because a VLAN is required.', 'error')
return redirect(f'/{_PAGE}')
if not mac:
flash('The configuration has not been saved because a MAC address is required.', 'error')
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(f'/{_PAGE}')
cfg = load_config()
vlans = cfg.get('vlans', [])
vlan = next((v for v in vlans if v.get('name') == vlan_name), None)
if vlan is None:
flash(f'The configuration has not been saved because VLAN "{vlan_name}" was not found.', 'error')
return redirect(f'/{_PAGE}')
subnet_err = _check_ip_in_vlan_subnet(ip, vlan)
if subnet_err:
flash(f'The configuration has not been saved because {subnet_err}', 'error')
return redirect(f'/{_PAGE}')
conflict = validate.check_reservation_ip_conflicts(ip, vlan)
if conflict:
flash(f'The configuration has not been saved because {conflict}', 'error')
return redirect(f'/{_PAGE}')
entry = {
'description': description,
'hostname': hostname,
'mac': mac,
'ip': ip,
'radius_client': radius_client,
'enabled': True,
'vlan': vlan_name,
}
cfg.setdefault('dhcp_reservations', []).append(entry)
errors = validate.validate_config(cfg)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
changes = diff_fields(None, entry)
flash(record_group(cfg, 'dhcp_reservations', 'mac', mac, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dhcp/reservations_toggle', methods=['POST'])
@require_level('administrator')
def reservations_toggle():
idx = _row_index()
if idx is None:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(f'/{_PAGE}')
cfg = load_config()
items = cfg.get('dhcp_reservations', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
return redirect(f'/{_PAGE}')
res = items[idx]
old_enabled = res.get('enabled', True)
before = copy.deepcopy(res)
res['enabled'] = not old_enabled
errors = validate.validate_config(cfg)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
changes = diff_fields(before, res)
flash(record_group(cfg, 'dhcp_reservations', 'mac', res['mac'], changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dhcp/reservations_edit', methods=['POST'])
@require_level('administrator')
def reservations_edit():
idx = _row_index()
if idx is None:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
description = sanitize.text(request.form.get('description', ''))
hostname = validate.domainname(request.form.get('hostname', ''))
mac = sanitize.mac(request.form.get('mac', ''))
ip = _parse_ip()
radius_client = 'radius_client' in request.form
if ip is None:
return redirect(f'/{_PAGE}')
if not mac:
flash('The configuration has not been saved because a MAC address is required.', 'error')
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(f'/{_PAGE}')
cfg = load_config()
items = cfg.get('dhcp_reservations', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
return redirect(f'/{_PAGE}')
res = items[idx]
vlan_name = res.get('vlan', '')
vlan = next((v for v in cfg.get('vlans', []) if v.get('name') == vlan_name), None)
if vlan:
subnet_err = _check_ip_in_vlan_subnet(ip, vlan)
if subnet_err:
flash(f'The configuration has not been saved because {subnet_err}', 'error')
return redirect(f'/{_PAGE}')
conflict = validate.check_reservation_ip_conflicts(ip, vlan)
if conflict:
flash(f'The configuration has not been saved because {conflict}', 'error')
return redirect(f'/{_PAGE}')
before = copy.deepcopy(res)
res.update({
'description': description,
'hostname': hostname,
'mac': mac,
'ip': ip,
'radius_client': radius_client,
'enabled': 'enabled' in request.form,
})
errors = validate.validate_config(cfg)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
changes = diff_fields(before, res)
flash(record_group(cfg, 'dhcp_reservations', 'mac', mac, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dhcp/reservations_delete', methods=['POST'])
@require_level('administrator')
def reservations_delete():
idx = _row_index()
if idx is None:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(f'/{_PAGE}')
cfg = load_config()
items = cfg.get('dhcp_reservations', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
return redirect(f'/{_PAGE}')
removed = items.pop(idx)
errors = validate.validate_config(cfg)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
changes = diff_fields(removed, None)
flash(record_group(cfg, 'dhcp_reservations', 'mac', removed['mac'], changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')

View file

@ -1,235 +0,0 @@
{
"client_requirement": "client_is_viewer+",
"items": [
{
"type": "header_page_title",
"items": [
{
"type": "h1",
"text": "DHCP"
},
{
"type": "p",
"text": "Active leases, IP reservations, and VLAN authorizations."
}
]
},
{
"type": "table",
"datasource": "live:dhcp_leases",
"empty_message": "No active DHCP leases found.",
"columns": [
{
"label": "Hostname",
"field": "hostname"
},
{
"label": "IP Address",
"field": "ip_address",
"class": "col-mono"
},
{
"label": "MAC Address",
"field": "mac_address",
"class": "col-mono"
},
{
"label": "VLAN",
"field": "vlan_name"
},
{
"label": "Expires",
"field": "expires"
}
]
},
{
"type": "table",
"datasource": "config:dhcp_reservations",
"empty_message": "No DHCP reservations configured.",
"columns": [
{
"label": "Description",
"field": "description"
},
{
"label": "Hostname",
"field": "hostname",
"class": "col-mono"
},
{
"label": "MAC",
"field": "mac",
"class": "col-mono"
},
{
"label": "IP",
"field": "ip",
"class": "col-mono"
},
{
"label": "VLAN",
"field": "vlan_name"
},
{
"label": "RADIUS",
"field": "radius_client",
"render": "badge_yes_no"
},
{
"label": "Status",
"field": "enabled",
"render": "badge_enabled_disabled"
}
],
"toolbar": {
"items": [
{
"type": "select",
"name": "vlan_filter",
"value": "all",
"options": "%VLAN_FILTER_OPTIONS%",
"filter_col": "vlan_name"
}
]
},
"row_actions": [
{
"client_requirement": "client_is_administrator+",
"action": "/action/dhcp/reservations_edit",
"method": "inline_edit",
"text": "Edit",
"class": "btn-ghost btn-sm",
"fields": [
{
"col": "description",
"input_type": "text"
},
{
"col": "hostname",
"input_type": "text",
"validate": "VALIDATION_NETWORK_NAME"
},
{
"col": "mac",
"input_type": "text",
"validate": "VALIDATION_MAC"
},
{
"col": "ip",
"input_type": "text",
"validate": "VALIDATION_IPV4_FORMAT|VALIDATION_ADDRESS",
"attrs": {
"data-vlan-subnets": "%VLAN_SUBNET_INFO_JSON%"
}
},
{
"col": "radius_client",
"input_type": "checkbox",
"checkbox_label": "Enabled"
},
{
"col": "enabled",
"input_type": "checkbox",
"checkbox_label": "Enabled"
}
]
},
{
"client_requirement": "client_is_administrator+",
"action": "/action/dhcp/reservations_delete",
"method": "post",
"text": "Delete",
"class": "btn-danger btn-sm"
}
]
},
{
"type": "card",
"id": "add-form",
"label": "Add Reservation/Authorization",
"client_requirement": "client_is_administrator+",
"items": [
{
"type": "form",
"action": "/action/dhcp/addreservation_add",
"method": "post",
"items": [
{
"type": "field",
"label": "VLAN",
"name": "vlan_name",
"input_type": "select",
"options": "%VLAN_NAMES_AS_OPTIONS%",
"hint": "VLAN this reservation belongs to."
},
{
"type": "field",
"label": "Description",
"name": "description",
"input_type": "text",
"placeholder": "e.g. NAS"
},
{
"type": "field",
"label": "Hostname",
"name": "hostname",
"input_type": "text",
"validate": "VALIDATION_NETWORK_NAME",
"optional": true,
"placeholder": "e.g. nas",
"attrs": {
"data-res-hosts-by-vlan": "%RESERVATION_HOSTNAMES_BY_VLAN_JSON%"
}
},
{
"type": "field",
"label": "MAC Address",
"name": "mac",
"input_type": "text",
"validate": "VALIDATION_MAC",
"placeholder": "e.g. aa:bb:cc:dd:ee:ff"
},
{
"type": "field",
"label": "IP Address",
"name": "ip",
"input_type": "text",
"validate": "VALIDATION_IPV4_FORMAT|VALIDATION_ADDRESS",
"optional": true,
"placeholder": "e.g. 192.168.10.50",
"hint": "Leave blank to authorize device on this VLAN dynamically.",
"attrs": {
"data-res-ips-by-vlan": "%RESERVATION_IPS_BY_VLAN_JSON%",
"data-vlan-subnets": "%VLAN_SUBNET_INFO_JSON%",
"data-vlan-select": "vlan_name"
}
},
{
"type": "field",
"label": "RADIUS Client",
"name": "radius_client",
"input_type": "checkbox",
"hint": "This device acts as a RADIUS authenticator, verifying credentials of other devices on the network."
},
{
"type": "button_row",
"items": [
{
"type": "button_primary",
"action": "/action/dhcp/addreservation_add",
"method": "post",
"text": "Add"
},
{
"type": "button_cancel",
"text": "Cancel"
}
]
}
]
}
]
}
]
}

View file

@ -1,27 +0,0 @@
import json
def collect_tokens(cfg):
vlans = cfg.get('vlans', [])
vlan_names = [v.get('name', '') for v in vlans]
res_ips_by_vlan, res_hosts_by_vlan = {}, {}
for v in vlans:
vn = v.get('name', '')
if not vn:
continue
vlan_res = [r for r in cfg.get('dhcp_reservations', []) if r.get('vlan') == vn]
res_ips_by_vlan[vn] = [r['ip'] for r in vlan_res if r.get('ip') and r['ip'] != 'dynamic']
res_hosts_by_vlan[vn] = [r['hostname'] for r in vlan_res if r.get('hostname')]
filter_opts = '<option value="all">All VLANs</option>' + ''.join(
f'<option value="{n}">{n}</option>' for n in vlan_names
)
return {
'VLAN_FILTER_OPTIONS': filter_opts,
'VLAN_NAMES_AS_OPTIONS': json.dumps([{'value': n, 'label': n} for n in vlan_names]),
'VLAN_SUBNET_INFO_JSON': json.dumps({
v.get('name', ''): {'subnet': v.get('subnet', ''), 'prefix': v.get('subnet_mask', 0)}
for v in vlans if v.get('name') and v.get('subnet')
}),
'RESERVATION_IPS_BY_VLAN_JSON': json.dumps(res_ips_by_vlan),
'RESERVATION_HOSTNAMES_BY_VLAN_JSON': json.dumps(res_hosts_by_vlan),
}

View file

@ -1,7 +1,133 @@
import os
import glob
from datetime import datetime, timezone
from config_utils import collect_layout_tokens, load_config, relative_time
from factory import (
load_json, build_table, table_token_key, iter_table_items, PAGES_DIR, e,
)
try:
import manuf as _manuf_mod
_mac_parser = _manuf_mod.MacParser()
except Exception:
_mac_parser = None
try:
from mac_vendor_lookup import MacLookup as _MacLookup
_mac_lookup = _MacLookup()
except Exception:
_mac_lookup = None
def _get_vendor(mac):
short, long = '', ''
if _mac_parser:
try:
short = _mac_parser.get_manuf(mac) or ''
except Exception:
pass
if _mac_lookup:
try:
long = _mac_lookup.lookup(mac) or ''
except Exception:
pass
return (short, long)
def _vendor_cell(vendor):
short, long = vendor
display = short if short else (long[:8] if long else '')
if not display:
return '-'
if long:
return f'<span data-vendor-long="{e(long)}">{e(display)}</span>'
return e(display)
def _parse_lease_secs(s):
s = str(s).strip().lower()
try:
if s.endswith('h'): return int(s[:-1]) * 3600
if s.endswith('m'): return int(s[:-1]) * 60
if s.endswith('d'): return int(s[:-1]) * 86400
except ValueError:
pass
return None
def live_dhcp_leases():
rows = []
now = int(datetime.now(tz=timezone.utc).timestamp())
cfg = load_config()
vlans = cfg.get('vlans', [])
vlan_lease_secs = {
v['name']: _parse_lease_secs(v.get('dhcp_information', {}).get('lease_time', ''))
for v in vlans if v.get('name')
}
mac_to_res = {
r['mac'].lower(): r['hostname']
for r in cfg.get('dhcp_reservations', [])
if r.get('mac') and r.get('hostname')
}
for leases_file in glob.glob('/var/lib/misc/dnsmasq-routlin-*.leases'):
stem = os.path.basename(leases_file)
vlan_name = stem[len('dnsmasq-routlin-'):-len('.leases')]
lease_secs = vlan_lease_secs.get(vlan_name)
try:
with open(leases_file) as f:
for line in f:
parts = line.strip().split()
if len(parts) < 4:
continue
expiry = int(parts[0])
if expiry < now:
continue
obtained_ts = (expiry - lease_secs) if lease_secs else None
renews_ts = (expiry - lease_secs // 2) if lease_secs else None
if obtained_ts is None:
last_active = '-'
elif obtained_ts <= now:
last_active = relative_time(obtained_ts, now, short=True) + ' ago'
elif renews_ts and renews_ts > now:
last_active = 'ETA ' + relative_time(renews_ts, now, short=True)
else:
last_active = 'ETA soon'
mac_norm = parts[1].lower()
device_h = parts[3] if parts[3] != '*' else None
res_h = mac_to_res.get(mac_norm)
if res_h and device_h and device_h.lower() != res_h.lower():
hostname_html = f'<strong>{e(res_h)}</strong><br/>({e(device_h)})'
elif res_h:
hostname_html = f'<strong>{e(res_h)}</strong>'
elif device_h:
hostname_html = e(device_h)
else:
hostname_html = '-'
rows.append({
'hostname': hostname_html,
'ip_address': parts[2],
'mac_address': parts[1],
'vendor': _vendor_cell(_get_vendor(parts[1])),
'vlan_name': vlan_name,
'last_active': last_active,
'renews': 'in ' + relative_time(renews_ts or expiry, now, short=True),
})
except Exception:
pass
return rows
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
vlan_names = [v.get('name', '') for v in vlans]
filter_opts = '<option value="all">All VLANs</option>' + ''.join(
f'<option value="{n}">{n}</option>' for n in vlan_names
)
return {'VLAN_FILTER_OPTIONS': filter_opts}
tokens['VLAN_FILTER_OPTIONS'] = filter_opts
content = load_json(f'{PAGES_DIR}/dhcpleases/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
rows = live_dhcp_leases() if ds == 'live:dhcp_leases' else []
tokens[table_token_key(ds)] = build_table(table_item, tokens, rows)
return tokens

View file

@ -1,7 +1,10 @@
import json
from config_utils import collect_layout_tokens, load_datasource
from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
vlan_names = [v.get('name', '') for v in vlans]
res_ips_by_vlan, res_hosts_by_vlan = {}, {}
@ -15,13 +18,16 @@ def collect_tokens(cfg):
filter_opts = '<option value="all">All VLANs</option>' + ''.join(
f'<option value="{n}">{n}</option>' for n in vlan_names
)
return {
'VLAN_FILTER_OPTIONS': filter_opts,
'VLAN_NAMES_AS_OPTIONS': json.dumps([{'value': n, 'label': n} for n in vlan_names]),
'VLAN_SUBNET_INFO_JSON': json.dumps({
v.get('name', ''): {'subnet': v.get('subnet', ''), 'prefix': v.get('subnet_mask', 0)}
for v in vlans if v.get('name') and v.get('subnet')
}),
'RESERVATION_IPS_BY_VLAN_JSON': json.dumps(res_ips_by_vlan),
'RESERVATION_HOSTNAMES_BY_VLAN_JSON': json.dumps(res_hosts_by_vlan),
}
tokens['VLAN_FILTER_OPTIONS'] = filter_opts
tokens['VLAN_NAMES_AS_OPTIONS'] = json.dumps([{'value': n, 'label': n} for n in vlan_names])
tokens['VLAN_SUBNET_INFO_JSON'] = json.dumps({
v.get('name', ''): {'subnet': v.get('subnet', ''), 'prefix': v.get('subnet_mask', 0)}
for v in vlans if v.get('name') and v.get('subnet')
})
tokens['RESERVATION_IPS_BY_VLAN_JSON'] = json.dumps(res_ips_by_vlan)
tokens['RESERVATION_HOSTNAMES_BY_VLAN_JSON'] = json.dumps(res_hosts_by_vlan)
content = load_json(f'{PAGES_DIR}/dhcpreservations/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
return tokens

View file

@ -1,11 +1,11 @@
import json
import os
from datetime import datetime, timezone
from view_common import fmt_bytes, relative_time, BLOCKLISTS_DIR
from factory import e
from config_utils import collect_layout_tokens, load_datasource, fmt_bytes, relative_time, BLOCKLISTS_DIR
from factory import e, load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
def _blocklist_stats_html(cfg):
def blocklist_stats_html(cfg):
rows = ''
for bl in cfg.get('dns_blocking', {}).get('blocklists', []):
name = e(bl.get('name', ''))
@ -44,14 +44,18 @@ def _blocklist_stats_html(cfg):
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
dns_blk_gen = cfg.get('dns_blocking', {}).get('general', {})
return {
'GENERAL_LOG_MAX_KB': str(dns_blk_gen.get('log_max_kb', '-')),
'GENERAL_LOG_ERRORS_ONLY': 'true' if dns_blk_gen.get('log_errors_only') else 'false',
'GENERAL_DAILY_EXECUTE_TIME': str(dns_blk_gen.get('daily_execute_time_24hr_local', '-')),
'BLOCKLIST_STATS_HTML': _blocklist_stats_html(cfg),
'BLOCKLIST_FORMAT_OPTIONS': json.dumps([
{'value': 'hosts', 'label': 'hosts (hosts file format)'},
{'value': 'dnsmasq', 'label': 'dnsmasq (local=/ syntax)'},
]),
}
tokens['GENERAL_LOG_MAX_KB'] = str(dns_blk_gen.get('log_max_kb', '-'))
tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if dns_blk_gen.get('log_errors_only') else 'false'
tokens['GENERAL_DAILY_EXECUTE_TIME'] = str(dns_blk_gen.get('daily_execute_time_24hr_local', '-'))
tokens['BLOCKLIST_STATS_HTML'] = blocklist_stats_html(cfg)
tokens['BLOCKLIST_FORMAT_OPTIONS'] = json.dumps([
{'value': 'hosts', 'label': 'hosts (hosts file format)'},
{'value': 'dnsmasq', 'label': 'dnsmasq (local=/ syntax)'},
])
content = load_json(f'{PAGES_DIR}/dnsblocking/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
return tokens

View file

@ -1,11 +1,12 @@
import json
from config_utils import collect_layout_tokens
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
dns = cfg.get('upstream_dns', {})
servers = dns.get('upstream_servers', [])
return {
'DNS_STRICT_ORDER': 'true' if dns.get('strict_order') else 'false',
'DNS_CACHE_SIZE': str(dns.get('cache_size', '-')),
'DNS_UPSTREAM_SERVERS_JSON': json.dumps(servers),
}
tokens['DNS_STRICT_ORDER'] = 'true' if dns.get('strict_order') else 'false'
tokens['DNS_CACHE_SIZE'] = str(dns.get('cache_size', '-'))
tokens['DNS_UPSTREAM_SERVERS_JSON'] = json.dumps(servers)
return tokens

View file

@ -0,0 +1,11 @@
from config_utils import collect_layout_tokens, load_datasource
from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
content = load_json(f'{PAGES_DIR}/hostoverrides/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
return tokens

View file

@ -1,11 +1,17 @@
import json
from config_utils import collect_layout_tokens, load_datasource
from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
def collect_tokens(cfg):
return {
'PROTOCOL_OPTIONS': json.dumps([
{'value': 'tcp', 'label': 'TCP'},
{'value': 'udp', 'label': 'UDP'},
{'value': 'both', 'label': 'TCP/UDP'},
]),
}
tokens = collect_layout_tokens(cfg)
tokens['PROTOCOL_OPTIONS'] = json.dumps([
{'value': 'tcp', 'label': 'TCP'},
{'value': 'udp', 'label': 'UDP'},
{'value': 'both', 'label': 'TCP/UDP'},
])
content = load_json(f'{PAGES_DIR}/intervlan/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
return tokens

View file

@ -0,0 +1,5 @@
from config_utils import collect_layout_tokens
def collect_tokens(cfg):
return collect_layout_tokens(cfg)

View file

@ -1,15 +1,21 @@
import json
from config_utils import collect_layout_tokens, load_datasource
from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
dv = next((v for v in vlans if v.get('radius_default')), None)
return {
'EXISTING_VLAN_IDS_JSON': json.dumps([v.get('vlan_id') for v in vlans]),
'EXISTING_VLAN_NAMES_JSON': json.dumps([v.get('name') for v in vlans]),
'RADIUS_DEFAULT_VLAN': f'"{dv["name"]}" (VLAN {dv["vlan_id"]})' if dv else 'none set',
'BLOCKLIST_NAME_OPTIONS': json.dumps([
{'value': bl.get('name', ''), 'label': bl.get('description', bl.get('name', ''))}
for bl in cfg.get('dns_blocking', {}).get('blocklists', [])
]),
}
tokens['EXISTING_VLAN_IDS_JSON'] = json.dumps([v.get('vlan_id') for v in vlans])
tokens['EXISTING_VLAN_NAMES_JSON'] = json.dumps([v.get('name') for v in vlans])
tokens['RADIUS_DEFAULT_VLAN'] = f'"{dv["name"]}" (VLAN {dv["vlan_id"]})' if dv else 'none set'
tokens['BLOCKLIST_NAME_OPTIONS'] = json.dumps([
{'value': bl.get('name', ''), 'label': bl.get('description', bl.get('name', ''))}
for bl in cfg.get('dns_blocking', {}).get('blocklists', [])
])
content = load_json(f'{PAGES_DIR}/networklayout/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
return tokens

View file

@ -1,6 +1,9 @@
import re
import os
from view_common import run, load_ddns, public_ip_info, live_dhcp_leases, fmt_timestamp, BLOCKLISTS_DIR
from config_utils import collect_layout_tokens, fmt_timestamp, BLOCKLISTS_DIR
from factory import run, load_ddns
from pages.ddns.view import public_ip_info
from pages.dhcpleases.view import live_dhcp_leases
def get_dnsmasq_stats():
@ -32,12 +35,12 @@ def get_dnsmasq_stats():
return stats
def _count_blocked_today():
def count_blocked_today():
out = run("journalctl -u dnsmasq --since today --no-pager 2>/dev/null | grep -c 'is NXDOMAIN'")
return out or '0'
def _count_blocked_domains():
def count_blocked_domains():
try:
total = sum(
int(run(f'wc -l < "{BLOCKLISTS_DIR}/{f}"') or 0)
@ -48,7 +51,7 @@ def _count_blocked_domains():
return '-'
def _bl_last_update():
def bl_last_update():
try:
mtime = max(
os.path.getmtime(f'{BLOCKLISTS_DIR}/{f}')
@ -60,6 +63,7 @@ def _bl_last_update():
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
non_vpn_vlans = [v for v in vlans if not v.get('is_vpn')]
vlan_names = [v.get('name', '') for v in vlans]
@ -69,26 +73,25 @@ def collect_tokens(cfg):
ddns = load_ddns()
ip_str, domains_sub, last_obtained = public_ip_info(ddns)
return {
'GENERAL_WAN_INTERFACE': str(net.get('wan_interface', '-')),
'OVERVIEW_VLAN_NAMES': ', '.join(vlan_names) or '-',
'STAT_VLAN_COUNT': str(len(non_vpn_vlans)),
'STAT_LEASE_COUNT': str(len(live_dhcp_leases())),
'STAT_BANNED_IP_COUNT': str(sum(1 for b in cfg.get('banned_ips', []) if b.get('enabled', True))),
'STAT_BLOCKLIST_COUNT': str(len(cfg.get('dns_blocking', {}).get('blocklists', []))),
'STAT_BLOCKED_TODAY': _count_blocked_today(),
'STAT_BLOCKED_DOMAINS': _count_blocked_domains(),
'STAT_BL_LAST_UPDATE': _bl_last_update(),
'STAT_UPTIME': run('uptime -p') or '-',
'STAT_NFTABLES_STATUS': 'Active' if run('nft list tables 2>/dev/null').strip() else 'Inactive',
'STAT_PUBLIC_IP': ip_str,
'STAT_DDNS_HOSTNAME': domains_sub,
'DNS_CACHE_SIZE': str(dns.get('cache_size', '-')),
'OVERVIEW_UPSTREAM_SERVERS': ', '.join(dns.get('upstream_servers', [])) or '-',
'DNS_STAT_QUERIES': dns_stats['queries'],
'DNS_STAT_HITS': dns_stats['hits'],
'DNS_STAT_HIT_RATE': dns_stats['hit_rate'],
'DNS_STAT_FORWARDED': dns_stats['forwarded'],
'DNS_STAT_AUTH': dns_stats['auth'],
'DNS_STAT_TCP_PEAK': dns_stats['tcp_peak'],
}
tokens['GENERAL_WAN_INTERFACE'] = str(net.get('wan_interface', '-'))
tokens['OVERVIEW_VLAN_NAMES'] = ', '.join(vlan_names) or '-'
tokens['STAT_VLAN_COUNT'] = str(len(non_vpn_vlans))
tokens['STAT_LEASE_COUNT'] = str(len(live_dhcp_leases()))
tokens['STAT_BANNED_IP_COUNT'] = str(sum(1 for b in cfg.get('banned_ips', []) if b.get('enabled', True)))
tokens['STAT_BLOCKLIST_COUNT'] = str(len(cfg.get('dns_blocking', {}).get('blocklists', [])))
tokens['STAT_BLOCKED_TODAY'] = count_blocked_today()
tokens['STAT_BLOCKED_DOMAINS'] = count_blocked_domains()
tokens['STAT_BL_LAST_UPDATE'] = bl_last_update()
tokens['STAT_UPTIME'] = run('uptime -p') or '-'
tokens['STAT_NFTABLES_STATUS'] = 'Active' if run('nft list tables 2>/dev/null').strip() else 'Inactive'
tokens['STAT_PUBLIC_IP'] = ip_str
tokens['STAT_DDNS_HOSTNAME'] = domains_sub
tokens['DNS_CACHE_SIZE'] = str(dns.get('cache_size', '-'))
tokens['OVERVIEW_UPSTREAM_SERVERS'] = ', '.join(dns.get('upstream_servers', [])) or '-'
tokens['DNS_STAT_QUERIES'] = dns_stats['queries']
tokens['DNS_STAT_HITS'] = dns_stats['hits']
tokens['DNS_STAT_HIT_RATE'] = dns_stats['hit_rate']
tokens['DNS_STAT_FORWARDED'] = dns_stats['forwarded']
tokens['DNS_STAT_AUTH'] = dns_stats['auth']
tokens['DNS_STAT_TCP_PEAK'] = dns_stats['tcp_peak']
return tokens

View file

@ -1,8 +1,74 @@
import json
from view_common import get_system_interfaces, iface_info
import os
from config_utils import collect_layout_tokens
_EXCLUDE_PREFIXES = ('lo', 'wg', 'docker', 'br-', 'veth', 'tun', 'tap', 'ppp', 'virbr', 'podman', 'vnet', 'macvtap', 'fc-')
def get_system_interfaces():
try:
return sorted(
n for n in os.listdir('/sys/class/net')
if not n.startswith(_EXCLUDE_PREFIXES)
and os.path.exists(f'/sys/class/net/{n}/device')
)
except Exception:
return []
def iface_info(iface):
base = f'/sys/class/net/{iface}'
def rd(path):
try:
with open(f'{base}/{path}') as f:
return f.read().strip()
except Exception:
return None
def int_val(val):
try:
return int(val) if val else None
except ValueError:
return None
wireless = os.path.isdir(f'{base}/wireless')
state = (rd('operstate') or 'unknown').upper()
if state == 'UNKNOWN':
state = 'UP'
carrier_raw = rd('carrier')
carrier = (carrier_raw == '1') if carrier_raw is not None else None
speed_raw = rd('speed')
try:
mbps = int(speed_raw)
if mbps <= 0:
speed = None
elif mbps >= 1000 and mbps % 1000 == 0:
speed = f'{mbps // 1000} Gbps'
else:
speed = f'{mbps} Mbps'
except (TypeError, ValueError):
speed = None
mac = rd('address')
perm_mac = rd('perm_address')
if perm_mac and perm_mac == '00:00:00:00:00:00':
perm_mac = None
return {
'name': iface,
'wireless': wireless,
'state': state,
'carrier': carrier,
'speed': speed,
'mtu': rd('mtu'),
'min_mtu': int_val(rd('min_mtu')),
'max_mtu': int_val(rd('max_mtu')),
'mac': mac,
'perm_mac': perm_mac,
}
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
net = cfg.get('network_interfaces', {})
wan = net.get('wan_interface', '')
lan = net.get('lan_interface', '')
@ -12,8 +78,6 @@ def collect_tokens(cfg):
sys_ifaces.append(configured)
sys_ifaces.sort()
iface_data = [iface_info(i) for i in sys_ifaces]
return {
'GENERAL_WAN_INTERFACE': str(wan or '-'),
'GENERAL_LAN_INTERFACE': str(lan or '-'),
'NETWORK_INTERFACE_DATA_JSON': json.dumps(iface_data),
}
tokens['GENERAL_WAN_INTERFACE'] = str(wan or '-')
tokens['NETWORK_INTERFACE_DATA_JSON'] = json.dumps(iface_data)
return tokens

View file

@ -1,11 +1,17 @@
import json
from config_utils import collect_layout_tokens, load_datasource
from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
def collect_tokens(cfg):
return {
'PROTOCOL_OPTIONS': json.dumps([
{'value': 'tcp', 'label': 'TCP'},
{'value': 'udp', 'label': 'UDP'},
{'value': 'both', 'label': 'TCP/UDP'},
]),
}
tokens = collect_layout_tokens(cfg)
tokens['PROTOCOL_OPTIONS'] = json.dumps([
{'value': 'tcp', 'label': 'TCP'},
{'value': 'udp', 'label': 'UDP'},
{'value': 'both', 'label': 'TCP/UDP'},
])
content = load_json(f'{PAGES_DIR}/portforwarding/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
return tokens

View file

@ -1,22 +1,28 @@
import json
from config_utils import collect_layout_tokens, load_datasource
from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
vlan_names = [v.get('name', '') for v in vlans]
filter_opts = '<option value="all">All VLANs</option>' + ''.join(
f'<option value="{n}">{n}</option>' for n in vlan_names
)
return {
'PROTOCOL_OPTIONS': json.dumps([
{'value': 'tcp', 'label': 'TCP'},
{'value': 'udp', 'label': 'UDP'},
{'value': 'both', 'label': 'TCP/UDP'},
]),
'VLAN_FILTER_OPTIONS': filter_opts,
'VLAN_NAMES_AS_OPTIONS': json.dumps([{'value': n, 'label': n} for n in vlan_names]),
'VLAN_SUBNET_INFO_JSON': json.dumps({
v.get('name', ''): {'subnet': v.get('subnet', ''), 'prefix': v.get('subnet_mask', 0)}
for v in vlans if v.get('name') and v.get('subnet')
}),
}
tokens['PROTOCOL_OPTIONS'] = json.dumps([
{'value': 'tcp', 'label': 'TCP'},
{'value': 'udp', 'label': 'UDP'},
{'value': 'both', 'label': 'TCP/UDP'},
])
tokens['VLAN_FILTER_OPTIONS'] = filter_opts
tokens['VLAN_NAMES_AS_OPTIONS'] = json.dumps([{'value': n, 'label': n} for n in vlan_names])
tokens['VLAN_SUBNET_INFO_JSON'] = json.dumps({
v.get('name', ''): {'subnet': v.get('subnet', ''), 'prefix': v.get('subnet_mask', 0)}
for v in vlans if v.get('name') and v.get('subnet')
})
content = load_json(f'{PAGES_DIR}/portwrangling/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
return tokens

View file

@ -1,12 +1,13 @@
import json
from flask import session
import sanitize
from config_utils import collect_layout_tokens
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
blank = [{'value': '', 'label': '-- Select timezone --'}]
return {
'PREF_EMAIL': session.get('email_address', ''),
'PREF_TIMEZONE': session.get('timezone', ''),
'TIMEZONE_OPTIONS': json.dumps(blank + [{'value': tz, 'label': tz} for tz in sanitize.VALID_TIMEZONES]),
}
tokens['PREF_EMAIL'] = session.get('email_address', '')
tokens['PREF_TIMEZONE'] = session.get('timezone', '')
tokens['TIMEZONE_OPTIONS'] = json.dumps(blank + [{'value': tz, 'label': tz} for tz in sanitize.VALID_TIMEZONES])
return tokens

View file

@ -1,11 +1,11 @@
import os
from view_common import CONFIGS_DIR
from config_utils import collect_layout_tokens, CONFIGS_DIR
RADIUS_LOG_MAX = 50
RADIUS_LOG_FILE = '/var/log/freeradius/radius.log'
def _radius_log_tail(cfg):
def radius_log_tail(cfg):
try:
log_max_kb = cfg.get('radius', {}).get('general', {}).get('log_max_kb', 1024)
size_kb = os.path.getsize(RADIUS_LOG_FILE) / 1024
@ -32,7 +32,7 @@ def _radius_log_tail(cfg):
def collect_tokens(cfg):
tokens = {}
tokens = collect_layout_tokens(cfg)
try:
tokens['RADIUS_SECRET'] = open(f'{CONFIGS_DIR}/.radius-secret').read().strip()
except OSError:
@ -44,5 +44,5 @@ def collect_tokens(cfg):
tokens['RADIUS_APPLY_TO'] = fr_opts.get('apply_to', 'all')
tokens['RADIUS_LOGGING'] = 'true' if fr_gen.get('logging', False) else ''
tokens['RADIUS_GEN_LOG_MAX_KB'] = str(fr_gen.get('log_max_kb', 1024))
tokens['RADIUS_LOG_TAIL'], tokens['RADIUS_LOG_SUMMARY'] = _radius_log_tail(cfg)
tokens['RADIUS_LOG_TAIL'], tokens['RADIUS_LOG_SUMMARY'] = radius_log_tail(cfg)
return tokens

View file

@ -1,7 +1,29 @@
import json
from config_utils import collect_layout_tokens, load_datasource, fmt_timestamp, fmt_bytes
from factory import run, load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
def live_vpn_sessions():
rows = []
out = run('wg show all dump 2>/dev/null')
for line in out.splitlines():
parts = line.split('\t')
if len(parts) == 9:
interface, pubkey, _psk, endpoint, allowed_ips, last_hs, rx, tx, _ka = parts
rows.append({
'peer_name': pubkey[:16] + '...',
'interface': interface,
'tunnel_ip': allowed_ips.split(',')[0].split('/')[0] if allowed_ips else '-',
'endpoint': endpoint if endpoint != '(none)' else '-',
'last_handshake': fmt_timestamp(int(last_hs)) if last_hs.isdigit() and last_hs != '0' else 'Never',
'rx_bytes': fmt_bytes(int(rx)) if rx.isdigit() else '-',
'tx_bytes': fmt_bytes(int(tx)) if tx.isdigit() else '-',
})
return rows
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
wg_vlans_list = sorted(
[v for v in vlans if v.get('is_vpn')],
@ -23,15 +45,19 @@ def collect_tokens(cfg):
except Exception:
vpn_gateway = ''
return {
'VPN_VLAN_OPTIONS': json.dumps([
{'value': v.get('name', ''), 'label': f'wg{i} (VLAN {v.get("vlan_id") or "?"})'}
for i, v in enumerate(wg_vlans_list)
]),
'VPN_LISTEN_PORT': str(vpn.get('listen_port', '')),
'VPN_SERVER_ENDPOINT': str(vpn.get('server_endpoint', '')),
'VPN_DOMAIN': str(vpn.get('domain', '')),
'VPN_DNS_SERVER': str(overrides.get('dns_servers', '')),
'VPN_MTU': str(overrides.get('mtu', '')),
'VPN_GATEWAY': vpn_gateway,
}
tokens['VPN_VLAN_OPTIONS'] = json.dumps([
{'value': v.get('name', ''), 'label': f'wg{i} (VLAN {v.get("vlan_id") or "?"})'}
for i, v in enumerate(wg_vlans_list)
])
tokens['VPN_LISTEN_PORT'] = str(vpn.get('listen_port', ''))
tokens['VPN_SERVER_ENDPOINT'] = str(vpn.get('server_endpoint', ''))
tokens['VPN_DOMAIN'] = str(vpn.get('domain', ''))
tokens['VPN_DNS_SERVER'] = str(overrides.get('dns_servers', ''))
tokens['VPN_MTU'] = str(overrides.get('mtu', ''))
tokens['VPN_GATEWAY'] = vpn_gateway
content = load_json(f'{PAGES_DIR}/vpn/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
rows = live_vpn_sessions() if ds == 'live:vpn_sessions' else load_datasource(ds)
tokens[table_token_key(ds)] = build_table(table_item, tokens, rows)
return tokens