from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_core, save_core, verify_core_hash, apply_msg import sanitize import ipaddress as _ipaddress bp = Blueprint('action_apply_vlans', __name__) VIEW = '/view/view_vlans' def _row_index(): try: return int(request.form.get('row_index', '')) except (ValueError, TypeError): return None def _hash_ok(): if not verify_core_hash(request.form.get('config_hash', '')): flash('Configuration was modified by another session. Please refresh and try again.', 'error') return False return True def _derive_vlan_id(subnet, prefix): """Return VLAN ID (1–4094) derived from the active octet of the network address, or None if not derivable. byte_index = (prefix-1) // 8.""" try: network = _ipaddress.ip_network(f'{subnet}/{prefix}', strict=False) octets = list(network.network_address.packed) byte_idx = (prefix - 1) // 8 vlan_id = octets[byte_idx] if 1 <= vlan_id <= 4094: return vlan_id return None except Exception: return None @bp.route('/action/add_vlan', methods=['POST']) @require_level('administrator') def add_vlan(): name = sanitize.name(request.form.get('name', '')) is_vpn = 'is_vpn' in request.form subnet = sanitize.ip(request.form.get('subnet', '')) subnet_mask = sanitize.subnet_mask(request.form.get('subnet_mask', '')) radius_default = 'radius_default' in request.form mdns_reflection = 'mdns_reflection' in request.form use_blocklists = sanitize.filterlist(request.form.getlist('use_blocklists'), {b.get('name') for b in load_core().get('blocklists', [])}) if not name: flash('Name is required.', 'error') return redirect(VIEW) if not subnet: flash('Subnet IP is required.', 'error') return redirect(VIEW) if subnet_mask is None: flash('Invalid subnet prefix (must be 1-30).', 'error') return redirect(VIEW) vlan_id = _derive_vlan_id(subnet, subnet_mask) if vlan_id is None: flash('Cannot derive a valid VLAN ID (1–4094) from this subnet/prefix combination.', 'error') return redirect(VIEW) if not _hash_ok(): return redirect(VIEW) core = load_core() vlans = core.setdefault('vlans', []) if any(v.get('vlan_id') == vlan_id for v in vlans): flash(f'VLAN {vlan_id} (derived from subnet) already exists.', 'error') return redirect(VIEW) entry = { 'vlan_id': vlan_id, 'name': name, 'is_vpn': is_vpn, 'subnet': subnet, 'subnet_mask': subnet_mask, 'use_blocklists': use_blocklists, 'radius_default': radius_default, 'mdns_reflection': mdns_reflection, } if is_vpn: entry['peers'] = [] else: entry['reservations'] = [] vlans.append(entry) save_core(core) flash(apply_msg(), 'success') return redirect(VIEW) @bp.route('/action/edit_vlan', methods=['POST']) @require_level('administrator') def edit_vlan(): idx = _row_index() if idx is None: flash('Invalid request.', 'error') return redirect(VIEW) name = sanitize.name(request.form.get('name', '')) subnet = sanitize.ip(request.form.get('subnet', '')) radius_default = 'radius_default' in request.form mdns_reflection = 'mdns_reflection' in request.form use_blocklists = sanitize.filterlist(request.form.getlist('use_blocklists'), {b.get('name') for b in load_core().get('blocklists', [])}) # subnet_mask is only present when the column is visible (not all edit paths send it). # Validate if submitted; fall back to the stored value otherwise. subnet_mask_raw = request.form.get('subnet_mask') if subnet_mask_raw is not None: subnet_mask = sanitize.subnet_mask(subnet_mask_raw) if subnet_mask is None: flash('Invalid subnet prefix (must be 1-30).', 'error') return redirect(VIEW) else: subnet_mask = None # resolved below after loading core if not name: flash('Name is required.', 'error') return redirect(VIEW) if not subnet: flash('Subnet IP is required.', 'error') return redirect(VIEW) if not _hash_ok(): return redirect(VIEW) core = load_core() vlans = core.get('vlans', []) if idx < 0 or idx >= len(vlans): flash('VLAN not found.', 'error') return redirect(VIEW) existing = vlans[idx] # is_vpn is never changed via edit — toggling it would invalidate peers/reservations. is_vpn = existing.get('is_vpn', False) # Use submitted subnet_mask, or fall back to whatever is already stored. final_mask = subnet_mask if subnet_mask is not None else existing.get('subnet_mask', 24) vlan_id = _derive_vlan_id(subnet, final_mask) if vlan_id is None: flash('Cannot derive a valid VLAN ID (1–4094) from this subnet/prefix combination.', 'error') return redirect(VIEW) current_id = existing.get('vlan_id') if current_id == 1 and vlan_id != 1: flash('VLAN 1 is the physical interface; change its subnet so the derived ID remains 1.', 'error') return redirect(VIEW) if vlan_id != current_id and any(i != idx and v.get('vlan_id') == vlan_id for i, v in enumerate(vlans)): flash(f'VLAN {vlan_id} (derived from subnet) already exists.', 'error') return redirect(VIEW) existing.update({ 'vlan_id': vlan_id, 'name': name, 'is_vpn': is_vpn, 'subnet': subnet, 'subnet_mask': final_mask, 'radius_default': radius_default, 'mdns_reflection': mdns_reflection, 'use_blocklists': use_blocklists, }) save_core(core) flash(apply_msg(), 'success') return redirect(VIEW) @bp.route('/action/delete_vlan', methods=['POST']) @require_level('administrator') def delete_vlan(): idx = _row_index() if idx is None: flash('Invalid request.', 'error') return redirect(VIEW) if not _hash_ok(): return redirect(VIEW) core = load_core() vlans = core.get('vlans', []) if idx < 0 or idx >= len(vlans): flash('VLAN not found.', 'error') return redirect(VIEW) removed = vlans.pop(idx) save_core(core) flash(apply_msg(), 'success') return redirect(VIEW)