import ipaddress import os import glob from datetime import datetime, timezone from config_utils import collect_layout_tokens, load_config, relative_time from factory import ( load_json, build_table, table_token_key, iter_table_items, PAGES_DIR, e, ) try: import manuf as _manuf_mod _mac_parser = _manuf_mod.MacParser() except Exception: _mac_parser = None try: from mac_vendor_lookup import MacLookup as _MacLookup _mac_lookup = _MacLookup() except Exception: _mac_lookup = None def _get_vendor(mac): short, long = '', '' if _mac_parser: try: short = _mac_parser.get_manuf(mac) or '' except Exception: pass if _mac_lookup: try: long = _mac_lookup.lookup(mac) or '' except Exception: pass return (short, long) def _vendor_cell(vendor): short, long = vendor display = short if short else (long[:8] if long else '') if not display: return '-' if long: return f'{e(display)}' return e(display) def _get_arp_table(): """Return {mac_lower: entry} from /proc/net/arp (host-mounted). ATF_COM (0x2) flag means the entry is complete; entries without it (incomplete) are excluded.""" try: entries = {} with open('/host/proc/net/arp') as f: next(f) # skip header line for line in f: parts = line.split() if len(parts) < 6: continue ip = parts[0] flags = int(parts[2], 16) mac = parts[3].lower() iface = parts[5] if not (flags & 0x2): continue if mac == '00:00:00:00:00:00': continue entries[mac] = {'ip': ip, 'iface': iface, 'state': 'REACHABLE'} return entries except Exception: return {} def _status_badge(state): if state == 'REACHABLE': return 'Online' return 'Offline' def _vlan_for_ip(ip_str, vlans): """Return VLAN name whose subnet contains ip_str, or '-'.""" try: addr = ipaddress.IPv4Address(ip_str) except ValueError: return '-' for v in vlans: subnet = v.get('subnet', '') mask = v.get('subnet_mask', '') if subnet and mask: try: if addr in ipaddress.IPv4Network(f'{subnet}/{mask}', strict=False): return v.get('name', '-') except ValueError: pass return '-' def _parse_lease_secs(s): s = str(s).strip().lower() try: if s.endswith('h'): return int(s[:-1]) * 3600 if s.endswith('m'): return int(s[:-1]) * 60 if s.endswith('d'): return int(s[:-1]) * 86400 except ValueError: pass return None def live_dhcp_leases(): rows = [] now = int(datetime.now(tz=timezone.utc).timestamp()) cfg = load_config() vlans = cfg.get('vlans', []) arp_table = _get_arp_table() lease_macs = set() vlan_lease_secs = { v['name']: _parse_lease_secs(v.get('dhcp_information', {}).get('lease_time', '')) for v in vlans if v.get('name') } mac_to_res = { r['mac'].lower(): r['hostname'] for r in cfg.get('dhcp_reservations', []) if r.get('mac') and r.get('hostname') } mac_to_desc = { r['mac'].lower(): r['description'] for r in cfg.get('dhcp_reservations', []) if r.get('mac') and r.get('description') } for leases_file in glob.glob('/var/lib/misc/dnsmasq-routlin-*.leases'): stem = os.path.basename(leases_file) vlan_name = stem[len('dnsmasq-routlin-'):-len('.leases')] lease_secs = vlan_lease_secs.get(vlan_name) try: with open(leases_file) as f: for line in f: parts = line.strip().split() if len(parts) < 4: continue expiry = int(parts[0]) if expiry < now: continue obtained_ts = (expiry - lease_secs) if lease_secs else None renews_ts = (expiry - lease_secs // 2) if lease_secs else None if obtained_ts is None: last_active = '-' elif obtained_ts <= now: last_active = relative_time(obtained_ts, now, short=True) + ' ago' elif renews_ts and renews_ts > now: last_active = 'ETA ' + relative_time(renews_ts, now, short=True) else: last_active = 'ETA soon' mac_norm = parts[1].lower() device_h = parts[3] if parts[3] != '*' else None res_h = mac_to_res.get(mac_norm) desc = mac_to_desc.get(mac_norm) name = res_h or device_h if name: desc_attr = f' data-hostname-desc="{e(desc)}"' if desc else '' hostname_html = f'{e(name)}' if desc_attr else e(name) else: hostname_html = '-' arp_entry = arp_table.get(mac_norm, {}) lease_macs.add(mac_norm) rows.append({ 'hostname': hostname_html, 'ip_address': parts[2], 'mac_address': parts[1], 'vendor': _vendor_cell(_get_vendor(parts[1])), 'vlan_name': vlan_name, 'last_active': last_active, 'renews': 'in ' + relative_time(renews_ts or expiry, now, short=True), 'status': _status_badge(arp_entry.get('state', '')), }) except Exception: pass for mac, arp in arp_table.items(): if mac in lease_macs: continue rows.append({ 'hostname': '-', 'ip_address': arp['ip'], 'mac_address': mac, 'vendor': _vendor_cell(_get_vendor(mac)), 'vlan_name': _vlan_for_ip(arp['ip'], vlans), 'last_active': '', 'renews': '', 'status': _status_badge(arp['state']), }) return rows def collect_tokens(cfg): tokens = collect_layout_tokens(cfg) vlans = cfg.get('vlans', []) vlan_names = [v.get('name', '') for v in vlans] filter_opts = '' + ''.join( f'' for n in vlan_names ) tokens['VLAN_FILTER_OPTIONS'] = filter_opts content = load_json(f'{PAGES_DIR}/dhcpleases/content.json') for table_item in iter_table_items(content.get('items', [])): ds = table_item.get('datasource', '') rows = live_dhcp_leases() if ds == 'live:dhcp_leases' else [] tokens[table_token_key(ds)] = build_table(table_item, tokens, rows) return tokens