import os import glob from datetime import datetime, timezone from config_utils import collect_layout_tokens, load_config, relative_time from factory import ( load_json, build_table, table_token_key, iter_table_items, PAGES_DIR, e, ) try: import manuf as _manuf_mod _mac_parser = _manuf_mod.MacParser() except Exception: _mac_parser = None try: from mac_vendor_lookup import MacLookup as _MacLookup _mac_lookup = _MacLookup() except Exception: _mac_lookup = None def _get_vendor(mac): short, long = '', '' if _mac_parser: try: short = _mac_parser.get_manuf(mac) or '' except Exception: pass if _mac_lookup: try: long = _mac_lookup.lookup(mac) or '' except Exception: pass return (short, long) def _vendor_cell(vendor): short, long = vendor display = short if short else (long[:8] if long else '') if not display: return '-' if long: return f'{e(display)}' return e(display) def _parse_lease_secs(s): s = str(s).strip().lower() try: if s.endswith('h'): return int(s[:-1]) * 3600 if s.endswith('m'): return int(s[:-1]) * 60 if s.endswith('d'): return int(s[:-1]) * 86400 except ValueError: pass return None def live_dhcp_leases(): rows = [] now = int(datetime.now(tz=timezone.utc).timestamp()) cfg = load_config() vlans = cfg.get('vlans', []) vlan_lease_secs = { v['name']: _parse_lease_secs(v.get('dhcp_information', {}).get('lease_time', '')) for v in vlans if v.get('name') } mac_to_res = { r['mac'].lower(): r['hostname'] for r in cfg.get('dhcp_reservations', []) if r.get('mac') and r.get('hostname') } mac_to_desc = { r['mac'].lower(): r['description'] for r in cfg.get('dhcp_reservations', []) if r.get('mac') and r.get('description') } for leases_file in glob.glob('/var/lib/misc/dnsmasq-routlin-*.leases'): stem = os.path.basename(leases_file) vlan_name = stem[len('dnsmasq-routlin-'):-len('.leases')] lease_secs = vlan_lease_secs.get(vlan_name) try: with open(leases_file) as f: for line in f: parts = line.strip().split() if len(parts) < 4: continue expiry = int(parts[0]) if expiry < now: continue obtained_ts = (expiry - lease_secs) if lease_secs else None renews_ts = (expiry - lease_secs // 2) if lease_secs else None if obtained_ts is None: last_active = '-' elif obtained_ts <= now: last_active = relative_time(obtained_ts, now, short=True) + ' ago' elif renews_ts and renews_ts > now: last_active = 'ETA ' + relative_time(renews_ts, now, short=True) else: last_active = 'ETA soon' mac_norm = parts[1].lower() device_h = parts[3] if parts[3] != '*' else None res_h = mac_to_res.get(mac_norm) desc = mac_to_desc.get(mac_norm) desc_attr = f' data-hostname-desc="{e(desc)}"' if desc else '' if res_h and device_h and device_h.lower() != res_h.lower(): hostname_html = f'{e(res_h)}
({e(device_h)})' if desc_attr else f'{e(res_h)}
({e(device_h)})' elif res_h: hostname_html = f'{e(res_h)}' if desc_attr else e(res_h) elif device_h: hostname_html = f'{e(device_h)}' if desc_attr else e(device_h) else: hostname_html = '-' rows.append({ 'hostname': hostname_html, 'ip_address': parts[2], 'mac_address': parts[1], 'vendor': _vendor_cell(_get_vendor(parts[1])), 'vlan_name': vlan_name, 'last_active': last_active, 'renews': 'in ' + relative_time(renews_ts or expiry, now, short=True), }) except Exception: pass return rows def collect_tokens(cfg): tokens = collect_layout_tokens(cfg) vlans = cfg.get('vlans', []) vlan_names = [v.get('name', '') for v in vlans] filter_opts = '' + ''.join( f'' for n in vlan_names ) tokens['VLAN_FILTER_OPTIONS'] = filter_opts content = load_json(f'{PAGES_DIR}/dhcpleases/content.json') for table_item in iter_table_items(content.get('items', [])): ds = table_item.get('datasource', '') rows = live_dhcp_leases() if ds == 'live:dhcp_leases' else [] tokens[table_token_key(ds)] = build_table(table_item, tokens, rows) return tokens