Development
This commit is contained in:
parent
f04b2b36cc
commit
5a3a18d5b0
7 changed files with 123 additions and 33 deletions
|
|
@ -1,5 +1,7 @@
|
|||
import ipaddress
|
||||
import os
|
||||
import glob
|
||||
import subprocess
|
||||
from datetime import datetime, timezone
|
||||
from config_utils import collect_layout_tokens, load_config, relative_time
|
||||
from factory import (
|
||||
|
|
@ -44,6 +46,52 @@ def _vendor_cell(vendor):
|
|||
return e(display)
|
||||
|
||||
|
||||
def _get_arp_table():
|
||||
"""Return {mac_lower: state} from `ip neigh`. Excludes FAILED/PERMANENT/INCOMPLETE."""
|
||||
try:
|
||||
result = subprocess.run(['ip', 'neigh'], capture_output=True, text=True, timeout=5)
|
||||
entries = {}
|
||||
for line in result.stdout.splitlines():
|
||||
parts = line.split()
|
||||
if 'lladdr' not in parts:
|
||||
continue
|
||||
state = parts[-1]
|
||||
if state in ('FAILED', 'PERMANENT', 'NOARP', 'INCOMPLETE'):
|
||||
continue
|
||||
idx = parts.index('lladdr')
|
||||
mac = parts[idx + 1].lower()
|
||||
ip = parts[0]
|
||||
iface = parts[2] if len(parts) > 2 else ''
|
||||
entries[mac] = {'ip': ip, 'iface': iface, 'state': state}
|
||||
return entries
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def _status_badge(state):
|
||||
if state == 'REACHABLE':
|
||||
return '<span class="badge badge-enabled">Online</span>'
|
||||
return '<span class="badge badge-disabled">Offline</span>'
|
||||
|
||||
|
||||
def _vlan_for_ip(ip_str, vlans):
|
||||
"""Return VLAN name whose subnet contains ip_str, or '-'."""
|
||||
try:
|
||||
addr = ipaddress.IPv4Address(ip_str)
|
||||
except ValueError:
|
||||
return '-'
|
||||
for v in vlans:
|
||||
subnet = v.get('subnet', '')
|
||||
mask = v.get('subnet_mask', '')
|
||||
if subnet and mask:
|
||||
try:
|
||||
if addr in ipaddress.IPv4Network(f'{subnet}/{mask}', strict=False):
|
||||
return v.get('name', '-')
|
||||
except ValueError:
|
||||
pass
|
||||
return '-'
|
||||
|
||||
|
||||
def _parse_lease_secs(s):
|
||||
s = str(s).strip().lower()
|
||||
try:
|
||||
|
|
@ -56,10 +104,13 @@ def _parse_lease_secs(s):
|
|||
|
||||
|
||||
def live_dhcp_leases():
|
||||
rows = []
|
||||
now = int(datetime.now(tz=timezone.utc).timestamp())
|
||||
cfg = load_config()
|
||||
vlans = cfg.get('vlans', [])
|
||||
rows = []
|
||||
now = int(datetime.now(tz=timezone.utc).timestamp())
|
||||
cfg = load_config()
|
||||
vlans = cfg.get('vlans', [])
|
||||
arp_table = _get_arp_table()
|
||||
lease_macs = set()
|
||||
|
||||
vlan_lease_secs = {
|
||||
v['name']: _parse_lease_secs(v.get('dhcp_information', {}).get('lease_time', ''))
|
||||
for v in vlans if v.get('name')
|
||||
|
|
@ -101,15 +152,14 @@ def live_dhcp_leases():
|
|||
device_h = parts[3] if parts[3] != '*' else None
|
||||
res_h = mac_to_res.get(mac_norm)
|
||||
desc = mac_to_desc.get(mac_norm)
|
||||
desc_attr = f' data-hostname-desc="{e(desc)}"' if desc else ''
|
||||
if res_h and device_h and device_h.lower() != res_h.lower():
|
||||
hostname_html = f'<span{desc_attr}>{e(res_h)}<br/>({e(device_h)})</span>' if desc_attr else f'{e(res_h)}<br/>({e(device_h)})'
|
||||
elif res_h:
|
||||
hostname_html = f'<span{desc_attr}>{e(res_h)}</span>' if desc_attr else e(res_h)
|
||||
elif device_h:
|
||||
hostname_html = f'<span{desc_attr}>{e(device_h)}</span>' if desc_attr else e(device_h)
|
||||
name = res_h or device_h
|
||||
if name:
|
||||
desc_attr = f' data-hostname-desc="{e(desc)}"' if desc else ''
|
||||
hostname_html = f'<span{desc_attr}>{e(name)}</span>' if desc_attr else e(name)
|
||||
else:
|
||||
hostname_html = '-'
|
||||
arp_entry = arp_table.get(mac_norm, {})
|
||||
lease_macs.add(mac_norm)
|
||||
rows.append({
|
||||
'hostname': hostname_html,
|
||||
'ip_address': parts[2],
|
||||
|
|
@ -118,9 +168,25 @@ def live_dhcp_leases():
|
|||
'vlan_name': vlan_name,
|
||||
'last_active': last_active,
|
||||
'renews': 'in ' + relative_time(renews_ts or expiry, now, short=True),
|
||||
'status': _status_badge(arp_entry.get('state', '')),
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for mac, arp in arp_table.items():
|
||||
if mac in lease_macs:
|
||||
continue
|
||||
rows.append({
|
||||
'hostname': '-',
|
||||
'ip_address': arp['ip'],
|
||||
'mac_address': mac,
|
||||
'vendor': _vendor_cell(_get_vendor(mac)),
|
||||
'vlan_name': _vlan_for_ip(arp['ip'], vlans),
|
||||
'last_active': '',
|
||||
'renews': '',
|
||||
'status': _status_badge(arp['state']),
|
||||
})
|
||||
|
||||
return rows
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue