diff --git a/docker/routlin-dash/app/action_apply_general.py b/docker/routlin-dash/app/action_apply_general.py
index 04e000e..2be86a0 100644
--- a/docker/routlin-dash/app/action_apply_general.py
+++ b/docker/routlin-dash/app/action_apply_general.py
@@ -14,6 +14,7 @@ def apply_general():
log_errors_only = 'log_errors_only' in request.form
dnsmasq_log_queries = 'dnsmasq_log_queries' in request.form
daily_execute_time = sanitize.time_24h(request.form.get('daily_execute_time_24hr_local', ''))
+ apply_on_save = 'apply_on_save' in request.form
log_max_kb = validate.int_range(log_max_kb_raw, 64, None)
if log_max_kb is None:
@@ -30,6 +31,7 @@ def apply_general():
'log_errors_only': log_errors_only,
'dnsmasq_log_queries': dnsmasq_log_queries,
'daily_execute_time_24hr_local': daily_execute_time,
+ 'apply_on_save': apply_on_save,
})
errors = validate.validate_config(core)
if errors:
diff --git a/docker/routlin-dash/app/action_apply_pending.py b/docker/routlin-dash/app/action_apply_pending.py
new file mode 100644
index 0000000..c08f1cc
--- /dev/null
+++ b/docker/routlin-dash/app/action_apply_pending.py
@@ -0,0 +1,26 @@
+from flask import Blueprint, redirect, flash
+from auth import require_level
+from config_utils import (flush_pending_to_queue, get_dashboard_pending,
+ _is_locked, _format_timing, _seconds_until_next_run)
+
+bp = Blueprint('action_apply_pending', __name__)
+
+
+@bp.route('/action/apply_pending', methods=['POST'])
+@require_level('administrator')
+def apply_pending():
+ items = get_dashboard_pending()
+ if not items:
+ flash('No pending changes to apply.', 'info')
+ return redirect('/view/view_general')
+ flush_pending_to_queue()
+ if _is_locked():
+ msg = 'Changes queued. They are being applied now.'
+ else:
+ timing = _format_timing(_seconds_until_next_run())
+ if timing:
+ msg = f'Changes queued. They will be applied {timing}.'
+ else:
+ msg = 'Changes queued. The processing service is not running.'
+ flash(msg, 'success')
+ return redirect('/view/view_general')
diff --git a/docker/routlin-dash/app/config_utils.py b/docker/routlin-dash/app/config_utils.py
index 1395126..726ac10 100644
--- a/docker/routlin-dash/app/config_utils.py
+++ b/docker/routlin-dash/app/config_utils.py
@@ -8,6 +8,8 @@ DASHBOARD_QUEUE = f'{CONFIGS_DIR}/.dashboard-queue'
DASHBOARD_DONE = f'{CONFIGS_DIR}/.dashboard-done'
DASHBOARD_LAST_RUN = f'{CONFIGS_DIR}/.dashboard-last-run'
DASHBOARD_LOCK = f'{CONFIGS_DIR}/.dashboard-lock'
+DASHBOARD_PENDING = f'{CONFIGS_DIR}/.dashboard-pending'
+STATUS_FILE = f'{CONFIGS_DIR}/.status'
DASHB_TIMER_NAME = 'routlin-dashboard-queue'
PRODUCT_DISPLAY_NAME = os.environ.get('PRODUCT_DISPLAY_NAME', 'Routlin Dashboard')
DASHB_INTERVAL_SECS = 60
@@ -103,7 +105,77 @@ def _trim_if_needed():
pass
-def _queue_command(cmd):
+def _apply_on_save():
+ try:
+ return load_core().get('general', {}).get('apply_on_save', True)
+ except Exception:
+ return True
+
+
+def _read_dashboard_pending():
+ """Return list of (uuid, ts, cmd, user, description) from .dashboard-pending."""
+ items = []
+ try:
+ lines = open(DASHBOARD_PENDING).read().splitlines()
+ except Exception:
+ return items
+ for line in lines:
+ if not line.strip():
+ continue
+ try:
+ main, _, desc = line.partition(' :: ')
+ parts = main.split(None, 3)
+ if len(parts) == 4:
+ entry_uuid, entry_ts, _dt, rest = parts
+ cmd_user = rest.rsplit(' (', 1)
+ entry_cmd = cmd_user[0].strip('[]')
+ entry_user = cmd_user[1].rstrip(')') if len(cmd_user) == 2 else ''
+ items.append((entry_uuid, int(entry_ts), entry_cmd, entry_user, desc))
+ except Exception:
+ pass
+ return items
+
+
+def get_dashboard_pending():
+ return _read_dashboard_pending()
+
+
+def flush_pending_to_queue():
+ """Move all entries from .dashboard-pending to .dashboard-queue and clear pending."""
+ items = _read_dashboard_pending()
+ if not items:
+ return
+ done_set = _load_done_set()
+ existing_ids = {uu for uu, *_ in _read_pending(done_set)}
+ with open(DASHBOARD_QUEUE, 'a') as f:
+ for entry_uuid, entry_ts, entry_cmd, entry_user, _desc in items:
+ if entry_uuid not in existing_ids:
+ dt_str = datetime.fromtimestamp(entry_ts).strftime('%Y-%m-%dT%H:%M:%S')
+ f.write(f'{entry_uuid} {entry_ts} {dt_str} [{entry_cmd}] ({entry_user})\n')
+ open(DASHBOARD_PENDING, 'w').close()
+ _trim_if_needed()
+
+
+def _queue_pending_command(cmd, description=''):
+ """Append cmd to .dashboard-pending if not already present for this cmd+user."""
+ existing = _read_dashboard_pending()
+ current_user = session.get('email_address', 'unknown')
+ for entry_uuid, entry_ts, entry_cmd, entry_user, _desc in existing:
+ if entry_cmd == cmd and entry_user == current_user:
+ return entry_uuid, entry_ts
+ entry_uuid = str(uuid.uuid4())
+ now = datetime.now()
+ entry_ts = int(now.timestamp())
+ dt_str = now.strftime('%Y-%m-%dT%H:%M:%S')
+ desc_suffix = f' :: {description}' if description else ''
+ with open(DASHBOARD_PENDING, 'a') as f:
+ f.write(f'{entry_uuid} {entry_ts} {dt_str} [{cmd}] ({current_user}){desc_suffix}\n')
+ return entry_uuid, entry_ts
+
+
+def _queue_command(cmd, description=''):
+ if not _apply_on_save():
+ return _queue_pending_command(cmd, description)
done_set = _load_done_set()
pending = _read_pending(done_set)
current_user = session.get('email_address', 'unknown')
@@ -155,17 +227,19 @@ def _lock_mtime():
return None
-def queue_command(cmd):
+def queue_command(cmd, description=''):
"""Queue a command without generating a flash message."""
- return _queue_command(cmd)
+ return _queue_command(cmd, description)
-def queued_msg(cmd=None):
+def queued_msg(cmd=None, description=''):
"""Queue cmd if given, then return a timing message.
Without cmd, just returns timing (for commands already queued by the caller)."""
entry_ts = None
if cmd is not None:
- _entry_uuid, entry_ts = queue_command(cmd)
+ _entry_uuid, entry_ts = queue_command(cmd, description)
+ if not _apply_on_save():
+ return 'Configuration saved. Click Apply Now on the Configuration Changes card to apply.'
if _is_locked():
mtime = _lock_mtime()
if entry_ts is not None and mtime and entry_ts < mtime:
@@ -178,7 +252,7 @@ def queued_msg(cmd=None):
return 'Changes queued. The processing service is not running.'
parts = cmd.split()
cli_cmd = f'sudo python3 {parts[0]}.py --{parts[1]}' if len(parts) == 2 else cmd
- install_cmd = f'sudo python3 {parts[0]}.py --install' if len(parts) >= 1 else 'core.py --install'
+ install_cmd = f'sudo python3 install.py'
from markupsafe import Markup
return Markup(f'Configuration saved. The command processing service is not installed. '
f'Run {install_cmd} to enable it, '
diff --git a/docker/routlin-dash/app/main.py b/docker/routlin-dash/app/main.py
index 9161e73..8b5eeea 100644
--- a/docker/routlin-dash/app/main.py
+++ b/docker/routlin-dash/app/main.py
@@ -24,6 +24,7 @@ from action_clear_ddns_log import bp as action_clear_ddns_log_bp
from action_apply_ddns_providers import bp as action_apply_ddns_providers_bp
from action_apply_interface import bp as action_apply_interface_bp
from action_apply_iface_config import bp as action_apply_iface_config_bp
+from action_apply_pending import bp as action_apply_pending_bp
from api_apply_status import bp as api_apply_status_bp
app = Flask(__name__)
@@ -52,6 +53,7 @@ app.register_blueprint(action_clear_ddns_log_bp)
app.register_blueprint(action_apply_ddns_providers_bp)
app.register_blueprint(action_apply_interface_bp)
app.register_blueprint(action_apply_iface_config_bp)
+app.register_blueprint(action_apply_pending_bp)
app.register_blueprint(api_apply_status_bp)
def _seed_initial_account():
diff --git a/docker/routlin-dash/app/view_page.py b/docker/routlin-dash/app/view_page.py
index 401d1af..a92bd18 100644
--- a/docker/routlin-dash/app/view_page.py
+++ b/docker/routlin-dash/app/view_page.py
@@ -4,7 +4,7 @@ import json, re, subprocess, os, sys, html as html_mod
import sanitize
import validation as validate
from datetime import datetime, timezone
-from config_utils import core_hash, get_pending_entries, _seconds_until_next_run, _format_timing, _is_locked, _lock_mtime, PRODUCT_DISPLAY_NAME
+from config_utils import core_hash, get_pending_entries, get_dashboard_pending, _seconds_until_next_run, _format_timing, _is_locked, _lock_mtime, PRODUCT_DISPLAY_NAME
bp = Blueprint('view_page', __name__)
@@ -529,6 +529,38 @@ def collect_tokens():
tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if gen.get('log_errors_only') else 'false'
tokens['GENERAL_DNSMASQ_LOG_QUERIES'] = 'true' if gen.get('dnsmasq_log_queries') else 'false'
tokens['GENERAL_DAILY_EXECUTE_TIME'] = str(gen.get('daily_execute_time_24hr_local', '-'))
+ tokens['GENERAL_APPLY_ON_SAVE'] = 'true' if gen.get('apply_on_save', True) else 'false'
+
+ pending_items = get_dashboard_pending()
+ if pending_items:
+ rows = ''
+ for _uuid, ts, cmd, user, desc in pending_items:
+ dt_str = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M')
+ label = e(desc) if desc else e(cmd)
+ rows += (f'
| {e(dt_str)} | '
+ f'{label} | '
+ f'{e(user)} |
')
+ pending_html = (
+ '
'
+ 'Pending Changes
'
+ ''
+ ''
+ ''
+ ''
+ ''
+ '
'
+ f'{rows}'
+ '
'
+ ''
+ )
+ else:
+ pending_html = ''
+ tokens['PENDING_CHANGES_HTML'] = pending_html
servers = dns.get('upstream_servers', [])
tokens['DNS_STRICT_ORDER'] = 'true' if dns.get('strict_order') else 'false'
@@ -895,6 +927,9 @@ def _render_item(item, tokens, inherited_req=None):
if t == 'table':
return _render_table(item, tokens, req)
+ if t == 'raw_html':
+ return Markup(apply_tokens(item.get('html', ''), tokens))
+
return ''
@@ -1364,6 +1399,23 @@ def render_layout(view_id, content_html, tokens):
cls = 'info-bar-warning'
other_bars += f'{text}
\n'
+ problem_bars = ''
+ try:
+ import json as _j
+ st = _j.load(open(f'{CONFIGS_DIR}/.status'))
+ for section in ('configurations', 'logs'):
+ for item in st.get(section, []):
+ if item.get('status') == 'problem':
+ sev = item.get('severity', 'error')
+ cls = 'info-bar-danger' if sev == 'error' else 'info-bar-warning'
+ text = e(item.get('detail', item.get('name', '')))
+ tip = item.get('suggestion', '')
+ if tip:
+ text += f' — {e(tip)}'
+ problem_bars += f'{text}
\n'
+ except Exception:
+ pass
+
return (f'\n\n\n'
f' \n'
f' \n'
@@ -1372,7 +1424,7 @@ def render_layout(view_id, content_html, tokens):
f'\n\n'
f'{titlebar_html}\n'
f'{navbar_html}\n'
- f'\n{other_bars}{content_html}\n\n'
+ f'\n{problem_bars}{other_bars}{content_html}\n\n'
f'{footer_html}\n'
f'\n'
f'\n'
diff --git a/docker/routlin-dash/data/page_content.json b/docker/routlin-dash/data/page_content.json
index c5e6ddf..2a63017 100644
--- a/docker/routlin-dash/data/page_content.json
+++ b/docker/routlin-dash/data/page_content.json
@@ -679,6 +679,47 @@
}
],
"client_requirement": "client_is_administrator+"
+ },
+ {
+ "type": "card",
+ "label": "Configuration Changes",
+ "client_requirement": "client_is_administrator+",
+ "items": [
+ {
+ "type": "form",
+ "action": "/action/apply_general",
+ "method": "post",
+ "items": [
+ {
+ "type": "field",
+ "label": "Apply on Save",
+ "name": "apply_on_save",
+ "input_type": "checkbox",
+ "value": "%GENERAL_APPLY_ON_SAVE%",
+ "hint": "When enabled, saved changes are queued immediately. When disabled, changes accumulate here until you click Apply Now."
+ },
+ {
+ "type": "button_row",
+ "items": [
+ {
+ "type": "button_primary",
+ "text": "Save",
+ "action": "/action/apply_general",
+ "method": "post"
+ },
+ {
+ "type": "button_cancel",
+ "text": "Cancel"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "type": "raw_html",
+ "html": "%PENDING_CHANGES_HTML%"
+ }
+ ]
}
]
},
diff --git a/routlin/USAGE.md b/routlin/USAGE.md
index 3c2f598..47fc8cb 100644
--- a/routlin/USAGE.md
+++ b/routlin/USAGE.md
@@ -23,6 +23,7 @@ All configuration lives in two JSON files. Edit these to match your network befo
| `.dashboard-done` | UUIDs of already-processed queue entries; prevents duplicate execution. |
| `.dashboard-last-run` | Epoch timestamp of the last timer execution. |
| `.dashboard-lock` | PID lock file preventing concurrent timer runs. |
+| `.dashboard-pending` | Changes held back when Apply on Save is disabled; flushed to `.dashboard-queue` when Apply Now is clicked. |
| `.dns-metrics` | Cumulative lifetime DNS metrics across all VLAN instances. Created and updated each time `--view-metrics` is run. |
| `.ddns-last-ip-*` | Cached public IP per DDNS provider. Managed by `ddns.py`. |
| `.ddns-last-service` | Tracks IP-check service rotation. Managed by `ddns.py`. |
diff --git a/routlin/core.py b/routlin/core.py
index e0b439a..ec2e3e5 100644
--- a/routlin/core.py
+++ b/routlin/core.py
@@ -2171,58 +2171,8 @@ def disable_avahi():
def show_status(data):
- import shutil
- col = shutil.get_terminal_size((80, 24)).columns
-
- def svc_row(unit, expected_active="active"):
- r_active = subprocess.run(["systemctl", "is-active", unit], capture_output=True, text=True)
- r_enabled = subprocess.run(["systemctl", "is-enabled", unit], capture_output=True, text=True)
- active = r_active.stdout.strip()
- enabled = r_enabled.stdout.strip()
- active_sym = "+" if active == "active" else "x"
- enabled_sym = "+" if enabled == "enabled" else "x"
- active_ok = "(OK) " if active == expected_active else "(BAD)"
- enabled_ok = "(OK) " if enabled == "enabled" else "(BAD)"
- return active_sym, active, active_ok, enabled_sym, enabled, enabled_ok
-
- units = []
- for vlan in data["vlans"]:
- iface = derive_interface(vlan, data)
- if is_wg(vlan) and not wg_interface_up(iface):
- units.append((vlan_service_name(vlan, iface), "(wg0 not up)", "active"))
- else:
- units.append((vlan_service_name(vlan, iface), None, "active"))
- units.append((f"{BLIST_TIMER_NAME}.timer", None, "active"))
- units.append((NAT_SERVICE_NAME, None, "inactive")) # oneshot - exits after running
- units.append(("freeradius", None, "active"))
- units.append(("avahi-daemon", None, "active"))
-
- print(f" {'UNIT':<45} {'ACTIVE':<18} {'ENABLED'}")
- print(f" {'-'*45} {'-'*18} {'-'*15}")
- for unit, note, expected_active in units:
- if note:
- print(f" {unit:<45} {note}")
- else:
- active_sym, active, active_ok, enabled_sym, enabled, enabled_ok = svc_row(unit, expected_active)
- print(f" {unit:<45} {active_sym} {active:<10} {active_ok} {enabled_sym} {enabled:<10} {enabled_ok}")
-
- # Timer next trigger
- r = subprocess.run(
- ["systemctl", "show", f"{BLIST_TIMER_NAME}.timer", "--property=NextElapseUSecRealtime,NextElapseUSecMonotonic"],
- capture_output=True, text=True
- )
- # Fall back to human-readable 'Trigger' field from status output
- r2 = subprocess.run(
- ["systemctl", "status", f"{BLIST_TIMER_NAME}.timer", "--no-pager"],
- capture_output=True, text=True
- )
- for line in r2.stdout.splitlines():
- line = line.strip()
- if line.startswith("Trigger:"):
- trigger = line.split("Trigger:", 1)[1].strip()
- if trigger and trigger != "n/a":
- print(f"\n Next blocklist update: {trigger}")
- break
+ import status as _status
+ _status.print_table(_status.run_and_write(data))
def show_configs(data):
for vlan in data["vlans"]:
@@ -3167,6 +3117,9 @@ def cmd_apply(data, dry_run=False):
print("Done.")
+ import status as _status
+ _status.print_table(_status.run_and_write(data))
+
def cmd_update_blocklists(data):
"""--update-blocklists: download and merge blocklists. On success, call
diff --git a/routlin/install.py b/routlin/install.py
index a53f58b..05a9412 100644
--- a/routlin/install.py
+++ b/routlin/install.py
@@ -29,6 +29,7 @@ DASHB_QUEUE_FILE = SCRIPT_DIR / ".dashboard-queue"
DASHB_DONE_FILE = SCRIPT_DIR / ".dashboard-done"
DASHB_LAST_RUN_FILE = SCRIPT_DIR / ".dashboard-last-run"
DASHB_LOCK_FILE = SCRIPT_DIR / ".dashboard-lock"
+DASHB_PENDING_FILE = SCRIPT_DIR / ".dashboard-pending"
# ===================================================================
@@ -303,7 +304,7 @@ def setup_docker_compose():
def create_dotfiles():
- for f in (DASHB_QUEUE_FILE, DASHB_DONE_FILE, DASHB_LAST_RUN_FILE, DASHB_LOCK_FILE):
+ for f in (DASHB_QUEUE_FILE, DASHB_DONE_FILE, DASHB_LAST_RUN_FILE, DASHB_LOCK_FILE, DASHB_PENDING_FILE):
if not f.exists():
f.touch()
# chown to the routlin dir owner so the timer can write
diff --git a/routlin/status.py b/routlin/status.py
new file mode 100644
index 0000000..aa6f745
--- /dev/null
+++ b/routlin/status.py
@@ -0,0 +1,760 @@
+"""
+status.py -- System health checks for Routlin.
+
+Reads core.json, checks services, configuration files, and logs, then writes
+.status JSON. Imported by core.py; also runnable standalone.
+
+Public API:
+ run_and_write(data) -> dict run all checks, write .status, return dict
+ print_table(status: dict) render the CLI service table from status dict
+"""
+import hashlib
+import ipaddress
+import json
+import os
+import re
+import shutil
+import socket
+import subprocess
+import sys
+from datetime import datetime, timezone
+from pathlib import Path
+
+from validation import derive_interface, derive_vlan_id, is_wg
+
+# ===================================================================
+# Constants (mirror core.py — no import to avoid circular dependency)
+# ===================================================================
+
+PRODUCT_NAME = "routlin"
+SCRIPT_DIR = Path(__file__).parent
+STATUS_FILE = SCRIPT_DIR / ".status"
+CONFIG_FILE = SCRIPT_DIR / "core.json"
+BLOCKLIST_DIR = SCRIPT_DIR / "blocklists"
+DNSMASQ_CONF_DIR = Path(f"/etc/dnsmasq-{PRODUCT_NAME}")
+LEASES_DIR = Path("/var/lib/misc")
+NETWORKD_DIR = Path("/etc/systemd/network")
+SYSTEMD_DIR = Path("/etc/systemd/system")
+WG_DIR = Path("/etc/wireguard")
+RESOLV_CONF = Path("/etc/resolv.conf")
+AVAHI_CONF_FILE = Path("/etc/avahi/avahi-daemon.conf")
+CHRONY_CONF_FILE = Path("/etc/chrony/chrony.conf")
+RADIUS_SECRET_FILE = SCRIPT_DIR / ".radius-secret"
+RADIUS_CLIENTS_CONF = Path("/etc/freeradius/3.0/clients.conf")
+RADIUS_USERS_FILE = Path("/etc/freeradius/3.0/users")
+BLIST_TIMER_NAME = f"{PRODUCT_NAME}-dns-blocklist-update"
+DASHB_TIMER_NAME = f"{PRODUCT_NAME}-dashboard-queue"
+DASHB_QUEUE_FILE = SCRIPT_DIR / ".dashboard-queue"
+NAT_SERVICE_NAME = f"{PRODUCT_NAME}-nat"
+BLOCKLIST_STALE_SECS = 36 * 3600
+DISK_WARN_PCT = 90
+DHCP_WARN_PCT = 90
+DNS_TIMEOUT_SECS = 2
+
+# ===================================================================
+# Small helpers replicated from core.py (no import)
+# ===================================================================
+
+def _vlan_service_name(vlan, iface):
+ if is_wg(vlan):
+ return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}-{iface}"
+ return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}"
+
+def _radius_enabled(data):
+ return any(
+ r.get("radius_client") is True
+ for v in data.get("vlans", [])
+ for r in v.get("reservations", [])
+ )
+
+def _avahi_enabled(data):
+ return any(
+ v.get("mdns_reflection") is True
+ for v in data.get("vlans", [])
+ if not is_wg(v)
+ )
+
+def _avahi_interfaces(data):
+ return [
+ derive_interface(v, data)
+ for v in data.get("vlans", [])
+ if v.get("mdns_reflection") is True and not is_wg(v)
+ ]
+
+def _combo_hash(names):
+ key = ",".join(sorted(names))
+ return hashlib.sha256(key.encode()).hexdigest()[:8]
+
+def _merged_path(h):
+ return BLOCKLIST_DIR / f"merged-{h}.conf"
+
+def _lowest_quartet_ip(vlan):
+ ips = []
+ for s in vlan.get("server_identities", []):
+ try:
+ ips.append(ipaddress.IPv4Address(s["ip"]))
+ except (KeyError, ValueError):
+ pass
+ return str(min(ips, key=lambda ip: ip.packed[-1])) if ips else None
+
+def _gateway_ips(data):
+ """Return set of all gateway IPs across all VLANs."""
+ gws = set()
+ for vlan in data.get("vlans", []):
+ ip = _lowest_quartet_ip(vlan)
+ if ip:
+ gws.add(ip)
+ return gws
+
+def _iface_operstate(iface):
+ """Read operstate from sysfs. Returns 'up', 'down', 'unknown', or None."""
+ try:
+ return Path(f"/sys/class/net/{iface}/operstate").read_text().strip()
+ except OSError:
+ return None
+
+def _sysctl_query(unit):
+ """Return (active, enabled) strings from systemctl."""
+ r_a = subprocess.run(["systemctl", "is-active", unit], capture_output=True, text=True)
+ r_e = subprocess.run(["systemctl", "is-enabled", unit], capture_output=True, text=True)
+ return r_a.stdout.strip(), r_e.stdout.strip()
+
+# ===================================================================
+# Result builders
+# ===================================================================
+
+def _ok(id_, name, detail=""):
+ r = {"id": id_, "name": name, "status": "ok"}
+ if detail:
+ r["detail"] = detail
+ return r
+
+def _problem(id_, name, severity, detail, suggestion=""):
+ r = {"id": id_, "name": name, "status": "problem",
+ "severity": severity, "detail": detail}
+ if suggestion:
+ r["suggestion"] = suggestion
+ return r
+
+# ===================================================================
+# Services checks
+# ===================================================================
+
+def check_services(data):
+ results = []
+ vlans = data.get("vlans", [])
+
+ units = []
+ for vlan in vlans:
+ iface = derive_interface(vlan, data)
+ name = _vlan_service_name(vlan, iface)
+ units.append({"id": name, "name": name,
+ "expected_active": "active", "expected_enabled": "enabled"})
+
+ units.append({"id": f"{BLIST_TIMER_NAME}.timer",
+ "name": f"{BLIST_TIMER_NAME}.timer",
+ "expected_active": "active", "expected_enabled": "enabled"})
+
+ units.append({"id": NAT_SERVICE_NAME,
+ "name": NAT_SERVICE_NAME,
+ "expected_active": "inactive",
+ "expected_enabled": "enabled"})
+
+ if DASHB_QUEUE_FILE.exists():
+ units.append({"id": f"{DASHB_TIMER_NAME}.timer",
+ "name": f"{DASHB_TIMER_NAME}.timer",
+ "expected_active": "active", "expected_enabled": "enabled"})
+
+ exp_fr_active = "active" if _radius_enabled(data) else "inactive"
+ exp_fr_enabled = "enabled" if _radius_enabled(data) else "disabled"
+ units.append({"id": "freeradius", "name": "freeradius",
+ "expected_active": exp_fr_active,
+ "expected_enabled": exp_fr_enabled})
+
+ exp_av_active = "active" if _avahi_enabled(data) else "inactive"
+ exp_av_enabled = "enabled" if _avahi_enabled(data) else "disabled"
+ units.append({"id": "avahi-daemon", "name": "avahi-daemon",
+ "expected_active": exp_av_active,
+ "expected_enabled": exp_av_enabled})
+
+ units.append({"id": "chrony", "name": "chrony",
+ "expected_active": "active", "expected_enabled": "enabled"})
+ units.append({"id": "systemd-networkd", "name": "systemd-networkd",
+ "expected_active": "active", "expected_enabled": "enabled"})
+
+ for u in units:
+ active, enabled = _sysctl_query(u["id"])
+ exp_active = u["expected_active"]
+ exp_enabled = u["expected_enabled"]
+ active_ok = active == exp_active
+ enabled_ok = enabled == exp_enabled
+ status = "ok" if (active_ok and enabled_ok) else "problem"
+ results.append({
+ "id": u["id"],
+ "name": u["name"],
+ "active": active,
+ "enabled": enabled,
+ "expected_active": exp_active,
+ "expected_enabled": exp_enabled,
+ "active_ok": active_ok,
+ "enabled_ok": enabled_ok,
+ "status": status,
+ })
+
+ return results
+
+# ===================================================================
+# Configuration checks
+# ===================================================================
+
+def check_configurations(data):
+ results = []
+ vlans = data.get("vlans", [])
+ non_wg = [v for v in vlans if not is_wg(v)]
+ wg_vlans = [v for v in vlans if is_wg(v)]
+ core_mtime = CONFIG_FILE.stat().st_mtime if CONFIG_FILE.exists() else 0
+
+ def file_ok(id_, name, path, severity="error", suggestion=""):
+ if not path.exists():
+ return _problem(id_, name, severity,
+ f"{path} does not exist.",
+ suggestion or f"Run sudo python3 core.py --apply to create it.")
+ if path.stat().st_mtime < core_mtime:
+ return _problem(id_, name, "warning",
+ f"{path} is older than core.json and may be stale.",
+ "Run sudo python3 core.py --apply to update it.")
+ return _ok(id_, name)
+
+ # --- nftables tables ---
+ try:
+ tables_out = subprocess.run(
+ ["nft", "list", "tables"], capture_output=True, text=True
+ ).stdout
+ for tbl in ("ip routlin-nat", "ip routlin-filter"):
+ if tbl in tables_out:
+ results.append(_ok(f"nft_{tbl.replace(' ', '_')}",
+ f"nftables table {tbl}"))
+ else:
+ results.append(_problem(
+ f"nft_{tbl.replace(' ', '_')}",
+ f"nftables table {tbl}",
+ "error",
+ f"nftables table '{tbl}' is missing.",
+ "Run sudo python3 core.py --apply to rebuild firewall rules."))
+ except Exception:
+ results.append(_problem("nft_tables", "nftables tables", "error",
+ "Could not query nftables (nft not available or failed)."))
+
+ # --- Docker bridge rules ---
+ try:
+ bridges = [
+ p.parent.name
+ for p in Path("/sys/class/net").glob("*/bridge")
+ if _iface_operstate(p.parent.name) == "up"
+ ]
+ if bridges:
+ fwd_out = subprocess.run(
+ ["nft", "list", "chain", "ip", "routlin-filter", "forward"],
+ capture_output=True, text=True
+ ).stdout
+ missing = [b for b in bridges if b not in fwd_out]
+ if missing:
+ results.append(_problem(
+ "nft_docker_bridges", "nftables Docker bridge rules", "warning",
+ f"Container bridge(s) {', '.join(missing)} have no nftables forward rules.",
+ "Run sudo python3 core.py --apply to add the missing rules."))
+ else:
+ results.append(_ok("nft_docker_bridges", "nftables Docker bridge rules"))
+ except Exception:
+ pass
+
+ # --- VLAN sub-interfaces ---
+ for vlan in non_wg:
+ iface = derive_interface(vlan, data)
+ vid = derive_vlan_id(vlan.get("subnet", ""), vlan.get("subnet_mask", 24))
+ state = _iface_operstate(iface)
+ id_ = f"iface_{vlan['name']}"
+ name = f"interface {iface}"
+ if state is None:
+ results.append(_problem(id_, name, "error",
+ f"Interface {iface} does not exist in /sys/class/net/.",
+ "Run sudo python3 core.py --apply to configure network interfaces."))
+ elif state != "up":
+ results.append(_problem(id_, name, "error",
+ f"Interface {iface} operstate is '{state}' (expected 'up').",
+ "Check systemd-networkd: sudo systemctl status systemd-networkd"))
+ else:
+ results.append(_ok(id_, name))
+
+ # --- WireGuard interfaces ---
+ for vlan in wg_vlans:
+ iface = derive_interface(vlan, data)
+ state = _iface_operstate(iface)
+ id_ = f"iface_wg_{vlan['name']}"
+ name = f"WireGuard interface {iface}"
+ if state is None:
+ results.append(_problem(id_, name, "error",
+ f"WireGuard interface {iface} does not exist.",
+ "Run sudo python3 core.py --apply to bring up WireGuard."))
+ elif state != "up":
+ results.append(_problem(id_, name, "error",
+ f"WireGuard interface {iface} operstate is '{state}'.",
+ f"Try: sudo wg-quick up {iface}"))
+ else:
+ results.append(_ok(id_, name))
+
+ # --- Stale WG interfaces when no WG VLANs configured ---
+ if not wg_vlans:
+ stale_wg = [
+ p.name for p in Path("/sys/class/net").iterdir()
+ if p.name.startswith("wg") and re.match(r"^wg\d+$", p.name)
+ ]
+ if stale_wg:
+ results.append(_problem(
+ "stale_wg_ifaces", "Stale WireGuard interfaces", "warning",
+ f"WireGuard interface(s) {', '.join(stale_wg)} exist but no VPN VLANs are configured.",
+ f"Bring them down manually: sudo wg-quick down {stale_wg[0]}"))
+
+ # --- dnsmasq config files ---
+ for vlan in vlans:
+ path = DNSMASQ_CONF_DIR / f"{vlan['name']}.conf"
+ results.append(file_ok(f"dnsmasq_conf_{vlan['name']}",
+ f"dnsmasq config {path.name}", path))
+
+ # --- systemd-networkd files ---
+ for vlan in non_wg:
+ iface = derive_interface(vlan, data)
+ vid = derive_vlan_id(vlan.get("subnet", ""), vlan.get("subnet_mask", 24))
+ net = NETWORKD_DIR / f"10-{PRODUCT_NAME}-{vlan['name']}.network"
+ results.append(file_ok(f"networkd_net_{vlan['name']}",
+ f"networkd {net.name}", net))
+ if vid != 1: # non-physical VLANs have a .netdev too
+ netdev = NETWORKD_DIR / f"10-{PRODUCT_NAME}-{vlan['name']}.netdev"
+ results.append(file_ok(f"networkd_netdev_{vlan['name']}",
+ f"networkd {netdev.name}", netdev))
+
+ # --- systemd unit files ---
+ for path in (SYSTEMD_DIR / f"{NAT_SERVICE_NAME}.service",
+ SYSTEMD_DIR / f"{BLIST_TIMER_NAME}.timer",
+ SYSTEMD_DIR / f"{BLIST_TIMER_NAME}.service"):
+ results.append(file_ok(f"unit_{path.stem}", f"systemd unit {path.name}", path))
+
+ # --- WireGuard config and key files ---
+ for vlan in wg_vlans:
+ iface = derive_interface(vlan, data)
+ conf = WG_DIR / f"{iface}.conf"
+ key = WG_DIR / f"{iface}.key"
+ pub = SCRIPT_DIR / f".{iface}.pub"
+ results.append(file_ok(f"wg_conf_{iface}", f"WireGuard {conf.name}", conf))
+ results.append(file_ok(f"wg_key_{iface}", f"WireGuard {key.name}", key))
+ results.append(file_ok(f"wg_pubkey_{iface}", f"WireGuard {pub.name}", pub))
+
+ # --- Stale WG conf files when no WG VLANs ---
+ if not wg_vlans and WG_DIR.exists():
+ stale = [
+ p for p in WG_DIR.glob("wg*.conf")
+ if p.read_text().startswith("# Generated by")
+ ]
+ if stale:
+ results.append(_problem(
+ "stale_wg_conf", "Stale WireGuard config files", "warning",
+ f"{', '.join(p.name for p in stale)} exist but no VPN VLANs are configured.",
+ "Remove with: sudo rm " + " ".join(str(p) for p in stale)))
+
+ # --- RADIUS files and secret check ---
+ if _radius_enabled(data):
+ results.append(file_ok("radius_secret_file", ".radius-secret file",
+ RADIUS_SECRET_FILE, "error"))
+ results.append(file_ok("radius_clients_conf", "FreeRADIUS clients.conf",
+ RADIUS_CLIENTS_CONF, "error"))
+ results.append(file_ok("radius_users_file", "FreeRADIUS users",
+ RADIUS_USERS_FILE, "error"))
+
+ # Secret content match
+ try:
+ secret = RADIUS_SECRET_FILE.read_text().strip()
+ conf_text = RADIUS_CLIENTS_CONF.read_text()
+ secret_ok = any(
+ line.strip().split("=", 1)[-1].strip() == secret
+ for line in conf_text.splitlines()
+ if "secret" in line and not line.strip().startswith("#")
+ )
+ if secret_ok:
+ results.append(_ok("radius_secret_match", "FreeRADIUS shared secret"))
+ else:
+ results.append(_problem(
+ "radius_secret_match", "FreeRADIUS shared secret", "error",
+ "clients.conf secret does not match .radius-secret. "
+ "Access points will reject all authentication requests.",
+ "Restore .radius-secret from backup, or run sudo python3 core.py --apply "
+ "then update the shared secret in your AP controller."))
+ except OSError:
+ pass # already caught above by file_ok
+ else:
+ # RADIUS not enabled — warn if generated config files still exist
+ if RADIUS_CLIENTS_CONF.exists():
+ try:
+ if "# Generated by" in RADIUS_CLIENTS_CONF.read_text():
+ results.append(_problem(
+ "radius_conf_orphan", "FreeRADIUS config", "warning",
+ "FreeRADIUS clients.conf contains routlin-generated content "
+ "but RADIUS is not enabled.",
+ "This is harmless if freeradius is stopped. "
+ "Remove with: sudo rm " + str(RADIUS_CLIENTS_CONF)))
+ except OSError:
+ pass
+
+ # --- Avahi config ---
+ if _avahi_enabled(data):
+ results.append(file_ok("avahi_conf", "avahi-daemon.conf",
+ AVAHI_CONF_FILE, "warning"))
+ if AVAHI_CONF_FILE.exists():
+ expected_ifaces = set(_avahi_interfaces(data))
+ try:
+ text = AVAHI_CONF_FILE.read_text()
+ m = re.search(r"allow-interfaces\s*=\s*(.+)", text)
+ if m:
+ actual_ifaces = {i.strip() for i in m.group(1).split(",")}
+ missing = expected_ifaces - actual_ifaces
+ extra = actual_ifaces - expected_ifaces
+ if missing or extra:
+ results.append(_problem(
+ "avahi_ifaces", "avahi-daemon interface list", "warning",
+ f"avahi-daemon.conf interface list does not match config "
+ f"(missing: {missing or 'none'}, extra: {extra or 'none'}).",
+ "Run sudo python3 core.py --apply to update."))
+ else:
+ results.append(_ok("avahi_ifaces",
+ "avahi-daemon interface list"))
+ except OSError:
+ pass
+
+ # --- resolv.conf ---
+ gateway_ips = _gateway_ips(data)
+ try:
+ resolv = RESOLV_CONF.read_text()
+ ns_ips = {
+ line.split()[1]
+ for line in resolv.splitlines()
+ if line.startswith("nameserver") and len(line.split()) >= 2
+ }
+ if ns_ips & gateway_ips:
+ results.append(_ok("resolv_conf", "/etc/resolv.conf"))
+ else:
+ results.append(_problem(
+ "resolv_conf", "/etc/resolv.conf", "warning",
+ f"/etc/resolv.conf nameserver(s) {ns_ips} do not include any VLAN gateway. "
+ f"Expected one of: {gateway_ips}.",
+ "Run sudo python3 core.py --apply to update /etc/resolv.conf."))
+ except OSError:
+ results.append(_problem("resolv_conf", "/etc/resolv.conf", "warning",
+ "/etc/resolv.conf is not readable.",
+ "Run sudo python3 core.py --apply."))
+
+ # --- chrony.conf ---
+ if CHRONY_CONF_FILE.exists():
+ try:
+ content = CHRONY_CONF_FILE.read_text()
+ missing_subnets = []
+ for vlan in non_wg:
+ try:
+ network = ipaddress.IPv4Network(
+ f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
+ cidr = str(network)
+ if f"allow {cidr}" not in content and f"allow {vlan['subnet']}" not in content:
+ missing_subnets.append(cidr)
+ except Exception:
+ pass
+ if missing_subnets:
+ results.append(_problem(
+ "chrony_conf", "/etc/chrony/chrony.conf", "warning",
+ f"chrony.conf is missing allow directives for: {', '.join(missing_subnets)}.",
+ "Run sudo python3 core.py --apply to update chrony.conf."))
+ else:
+ results.append(_ok("chrony_conf", "/etc/chrony/chrony.conf"))
+ except OSError:
+ results.append(_problem("chrony_conf", "/etc/chrony/chrony.conf", "warning",
+ "/etc/chrony/chrony.conf is not readable."))
+ else:
+ results.append(_problem("chrony_conf", "/etc/chrony/chrony.conf", "warning",
+ "/etc/chrony/chrony.conf does not exist.",
+ "Install chrony: sudo apt-get install chrony"))
+
+ # --- Stale WG conf when no WG VLANs (already handled above) ---
+
+ # --- DHCP pool utilization ---
+ for vlan in non_wg:
+ try:
+ dhcp = vlan.get("dhcp_information", {})
+ start = dhcp.get("pool_start", "")
+ end = dhcp.get("pool_end", "")
+ if not start or not end:
+ continue
+ pool_size = (int(ipaddress.IPv4Address(end))
+ - int(ipaddress.IPv4Address(start)) + 1)
+ if pool_size <= 0:
+ continue
+ lease_file = LEASES_DIR / f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}.leases"
+ if not lease_file.exists():
+ continue
+ leases = [
+ l for l in lease_file.read_text().splitlines()
+ if l.strip() and not l.startswith("#")
+ ]
+ pct = len(leases) * 100 // pool_size
+ if pct >= DHCP_WARN_PCT:
+ results.append(_problem(
+ f"dhcp_pool_{vlan['name']}",
+ f"DHCP pool ({vlan['name']})", "warning",
+ f"DHCP pool for VLAN '{vlan['name']}' is {pct}% full "
+ f"({len(leases)}/{pool_size} leases).",
+ "Expand the pool range in core.json or clean up stale leases "
+ "with: sudo python3 core.py --reset-leases " + vlan['name']))
+ else:
+ results.append(_ok(f"dhcp_pool_{vlan['name']}",
+ f"DHCP pool ({vlan['name']})",
+ f"{pct}% used ({len(leases)}/{pool_size})"))
+ except Exception:
+ pass
+
+ # --- Blocklist file freshness ---
+ blocklists = data.get("blocklists", [])
+ if blocklists:
+ combos = {}
+ for vlan in vlans:
+ names = vlan.get("use_blocklists", [])
+ if names:
+ combos[_combo_hash(names)] = names
+ now = datetime.now(timezone.utc).timestamp()
+ for h, names in combos.items():
+ path = _merged_path(h)
+ label = ", ".join(names)
+ if not path.exists():
+ results.append(_problem(
+ f"blocklist_{h}", f"blocklist ({label})", "warning",
+ f"Merged blocklist file for '{label}' does not exist.",
+ "Run sudo python3 core.py --update-blocklists to download blocklists."))
+ elif now - path.stat().st_mtime > BLOCKLIST_STALE_SECS:
+ age_h = int((now - path.stat().st_mtime) / 3600)
+ results.append(_problem(
+ f"blocklist_{h}", f"blocklist ({label})", "warning",
+ f"Merged blocklist for '{label}' is {age_h}h old (threshold 36h).",
+ "Run sudo python3 core.py --update-blocklists to refresh."))
+ else:
+ results.append(_ok(f"blocklist_{h}", f"blocklist ({label})"))
+
+ # --- Disk space ---
+ try:
+ usage = shutil.disk_usage("/")
+ pct = usage.used * 100 // usage.total
+ if pct >= DISK_WARN_PCT:
+ results.append(_problem(
+ "disk_space", "Disk space", "warning",
+ f"Root filesystem is {pct}% full "
+ f"({usage.used // 1_073_741_824}G of {usage.total // 1_073_741_824}G used).",
+ "Free up disk space to avoid service disruption."))
+ else:
+ results.append(_ok("disk_space", "Disk space",
+ f"{pct}% used"))
+ except Exception:
+ pass
+
+ # --- Upstream DNS reachability ---
+ servers = data.get("upstream_dns", {}).get("upstream_servers", [])
+ unreachable = []
+ for srv in servers:
+ try:
+ with socket.create_connection((srv, 53), timeout=DNS_TIMEOUT_SECS):
+ pass
+ except OSError:
+ unreachable.append(srv)
+ if unreachable:
+ results.append(_problem(
+ "upstream_dns", "Upstream DNS reachability", "warning",
+ f"Upstream DNS server(s) unreachable on port 53: {', '.join(unreachable)}.",
+ "Check WAN connectivity and upstream DNS server addresses in core.json."))
+ elif servers:
+ results.append(_ok("upstream_dns", "Upstream DNS reachability"))
+
+ return results
+
+# ===================================================================
+# Log checks
+# ===================================================================
+
+def check_logs(data):
+ results = []
+
+ # --- FreeRADIUS auth failures ---
+ radius_log = Path("/var/log/freeradius/radius.log")
+ if radius_log.exists():
+ try:
+ now = datetime.now(timezone.utc).timestamp()
+ cutoff = now - 3600
+ lines = radius_log.read_text(errors="replace").splitlines()
+ # Parse lines with timestamps like "Thu May 21 11:53:47 2026 : Info: ..."
+ recent = []
+ failure_re = re.compile(r"Shared secret is incorrect")
+ ts_re = re.compile(
+ r"(\w+ \w+ +\d+ \d+:\d+:\d+ \d+) : ")
+ for line in lines[-2000:]: # scan last 2000 lines
+ m = ts_re.match(line)
+ if not m:
+ continue
+ try:
+ ts = datetime.strptime(m.group(1), "%a %b %d %H:%M:%S %Y")
+ ts = ts.replace(tzinfo=timezone.utc)
+ if ts.timestamp() >= cutoff:
+ recent.append(line)
+ except ValueError:
+ pass
+
+ failures = [l for l in recent if failure_re.search(l)]
+ if failures:
+ # Extract distinct AP names from "(from client ...)" pattern
+ ap_re = re.compile(r"\(from client ([^)]+)\)")
+ aps = sorted({m.group(1) for l in failures
+ for m in ap_re.finditer(l)})
+ ap_str = ", ".join(aps) if aps else f"{len(failures)} request(s)"
+ results.append(_problem(
+ "freeradius_auth_failures",
+ "FreeRADIUS auth failures", "error",
+ f"FreeRADIUS is rejecting requests from {ap_str} with "
+ f"'Shared secret is incorrect' ({len(failures)} failures in the last hour).",
+ "Restore .radius-secret from backup and run sudo python3 core.py --apply, "
+ "or update the shared secret in your AP controller to match .radius-secret."))
+ else:
+ results.append(_ok("freeradius_auth_failures",
+ "FreeRADIUS auth failures"))
+
+ # High rejection rate (>50% of recent activity is failures)
+ if recent and len(failures) > len(recent) * 0.5 and not failures:
+ results.append(_problem(
+ "freeradius_high_reject_rate",
+ "FreeRADIUS rejection rate", "warning",
+ f"Over half of recent FreeRADIUS activity ({len(failures)}/{len(recent)}) "
+ f"are auth failures.",
+ "Investigate FreeRADIUS config and shared secrets."))
+ elif recent:
+ results.append(_ok("freeradius_high_reject_rate",
+ "FreeRADIUS rejection rate"))
+
+ except OSError:
+ pass
+
+ # --- dnsmasq errors ---
+ try:
+ r = subprocess.run(
+ ["journalctl", f"-u", f"dnsmasq-{PRODUCT_NAME}-*",
+ "--since", "-1h", "--priority=err", "--no-pager", "-q"],
+ capture_output=True, text=True, timeout=5
+ )
+ err_lines = [l for l in r.stdout.splitlines() if l.strip()]
+ if err_lines:
+ results.append(_problem(
+ "dnsmasq_errors", "dnsmasq errors", "error",
+ f"{len(err_lines)} dnsmasq error(s) in the last hour: "
+ f"{err_lines[0][:120]}{'...' if len(err_lines) > 1 else ''}",
+ "Check dnsmasq logs: sudo journalctl -u 'dnsmasq-routlin-*' --since -1h"))
+ else:
+ results.append(_ok("dnsmasq_errors", "dnsmasq errors"))
+ except Exception:
+ pass
+
+ return results
+
+# ===================================================================
+# Next blocklist update
+# ===================================================================
+
+def _next_blocklist_update():
+ try:
+ r = subprocess.run(
+ ["systemctl", "status", f"{BLIST_TIMER_NAME}.timer", "--no-pager"],
+ capture_output=True, text=True, timeout=5
+ )
+ for line in r.stdout.splitlines():
+ line = line.strip()
+ if line.startswith("Trigger:"):
+ trigger = line.split("Trigger:", 1)[1].strip()
+ if trigger and trigger != "n/a":
+ return trigger
+ except Exception:
+ pass
+ return None
+
+# ===================================================================
+# Public API
+# ===================================================================
+
+def run_and_write(data):
+ """Run all checks, write .status atomically, return the status dict."""
+ status = {
+ "checked_at": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
+ "services": check_services(data),
+ "configurations": check_configurations(data),
+ "logs": check_logs(data),
+ "next_blocklist_update": _next_blocklist_update(),
+ }
+ tmp = STATUS_FILE.with_suffix(".tmp")
+ tmp.write_text(json.dumps(status, indent=2))
+ tmp.replace(STATUS_FILE)
+ return status
+
+
+def print_table(status):
+ """Print the service status table and any problems to stdout."""
+ col = shutil.get_terminal_size((80, 24)).columns
+
+ services = status.get("services", [])
+ print(f"\n {'UNIT':<45} {'ACTIVE':<18} {'ENABLED'}")
+ print(f" {'-'*45} {'-'*18} {'-'*15}")
+ for svc in services:
+ active = svc.get("active", "unknown")
+ enabled = svc.get("enabled", "unknown")
+ a_ok = svc.get("active_ok", True)
+ e_ok = svc.get("enabled_ok", True)
+ a_sym = "+" if active == "active" else "x"
+ e_sym = "+" if enabled == "enabled" else "x"
+ a_status = "(OK) " if a_ok else "(BAD)"
+ e_status = "(OK) " if e_ok else "(BAD)"
+ print(f" {svc['name']:<45} "
+ f"{a_sym} {active:<10} {a_status} "
+ f"{e_sym} {enabled:<10} {e_status}")
+
+ trigger = status.get("next_blocklist_update")
+ if trigger:
+ print(f"\n Next blocklist update: {trigger}")
+
+ problems = [
+ item
+ for section in ("configurations", "logs")
+ for item in status.get(section, [])
+ if item.get("status") == "problem"
+ ]
+ if problems:
+ print(f"\n Problems {'=' * (col - 12)}")
+ for p in problems:
+ sev = p.get("severity", "error")
+ tag = f"[{sev}]"
+ detail = p.get("detail", p.get("name", ""))
+ print(f" {tag:<10} {detail}")
+ tip = p.get("suggestion", "")
+ if tip:
+ print(f" {'':10} -> {tip}")
+ print()
+
+
+# ===================================================================
+# Standalone entry point
+# ===================================================================
+
+if __name__ == "__main__":
+ try:
+ with open(CONFIG_FILE) as f:
+ data = json.load(f)
+ except Exception as ex:
+ print(f"Error loading {CONFIG_FILE}: {ex}", file=sys.stderr)
+ sys.exit(1)
+ status = run_and_write(data)
+ print_table(status)
diff --git a/routlin/validation.py b/routlin/validation.py
index 50a5180..c0bf581 100644
--- a/routlin/validation.py
+++ b/routlin/validation.py
@@ -701,6 +701,19 @@ def validate_config(data):
errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). "
f"Only one VLAN may be the RADIUS default.")
+ # -- RADIUS requires multiple VLANs ----------------------------------------
+ non_wg_vlans = [v for v in data.get("vlans", []) if not is_wg(v)]
+ has_radius_clients = any(
+ r.get("radius_client")
+ for v in non_wg_vlans
+ for r in v.get("reservations", [])
+ )
+ if has_radius_clients and len(non_wg_vlans) < 2:
+ errors.append(
+ "RADIUS clients are configured but only one non-VPN VLAN exists. "
+ "Dynamic VLAN assignment requires at least two VLANs."
+ )
+
# -- host_overrides validation ---------------------------------------------
all_vlan_nets = list(vlan_networks.values())
for idx, entry in enumerate(data.get("host_overrides", [])):