#!/usr/bin/env python3 """ maintenance.py -- Periodic maintenance tasks run by the routlin-maintenance systemd timer. Tasks performed on each run: 1. DDNS update (delegates to ddns.py) 2. FreeRADIUS log rotation 3. ARP cache refresh 4. DNS metrics collection (delegates to metrics.py) Usage: python3 maintenance.py """ import ipaddress import json import re import subprocess import sys from pathlib import Path SCRIPT_DIR = Path(__file__).parent CONFIG_FILE = SCRIPT_DIR / "config.json" DDNS_SCRIPT = SCRIPT_DIR / "ddns.py" METRICS_SCRIPT = SCRIPT_DIR / "metrics.py" RADIUS_LOG_FILE = Path("/var/log/freeradius/radius.log") ARP_CACHE_FILE = Path("/var/lib/misc/arp-cache.json") # =================================================================== # Config # =================================================================== def load_config(): if not CONFIG_FILE.exists(): print(f"ERROR: Config file not found: {CONFIG_FILE}", file=sys.stderr) sys.exit(1) with open(CONFIG_FILE) as f: return json.load(f) # =================================================================== # DDNS - delegate to ddns.py # =================================================================== def run_ddns(): subprocess.run([sys.executable, str(DDNS_SCRIPT), "--update"]) # =================================================================== # FreeRADIUS log rotation # =================================================================== def _clear_radius_log_dir(log_dir, reason): try: files = [p for p in log_dir.iterdir() if p.is_file()] if not files: return for p in files: try: p.unlink() except PermissionError: print(f"WARNING: Cannot delete {p} (permission denied).") except OSError as e: print(f"WARNING: Error deleting {p}: {e}") print(f"FreeRADIUS logs cleared ({reason}).") except PermissionError: print(f"WARNING: Cannot read {log_dir} (permission denied).") except OSError as e: print(f"WARNING: Error clearing FreeRADIUS log dir: {e}") def rotate_radius_log(radius_cfg): general = radius_cfg.get("general", {}) log_dir = RADIUS_LOG_FILE.parent if not log_dir.exists(): return if not general.get("logging", False): _clear_radius_log_dir(log_dir, "logging disabled") return max_kb = general.get("log_max_kb", 1024) max_bytes = int(max_kb * 1024) try: files = [p for p in log_dir.iterdir() if p.is_file()] total = sum(p.stat().st_size for p in files) if total > max_bytes: _clear_radius_log_dir(log_dir, f"total {total // 1024} KB exceeded {max_kb} KB") except PermissionError: print(f"WARNING: Cannot read {log_dir} (permission denied).") except OSError as e: print(f"WARNING: Error checking FreeRADIUS log dir: {e}") # =================================================================== # ARP cache # =================================================================== ARP_MAX_AGE_SECS = 4 * 3600 def refresh_arp_cache(cfg): vlan_networks = [] for v in cfg.get('vlans', []): subnet = v.get('subnet') mask = v.get('subnet_mask') if subnet and mask: try: vlan_networks.append(ipaddress.IPv4Network(f'{subnet}/{mask}', strict=False)) except ValueError: pass try: result = subprocess.run(['ip', '-stats', 'neigh'], capture_output=True, text=True, timeout=5) best = {} for line in result.stdout.splitlines(): parts = line.split() if 'lladdr' not in parts: continue if ':' in parts[0]: continue try: addr = ipaddress.IPv4Address(parts[0]) if vlan_networks and not any(addr in n for n in vlan_networks): continue except ValueError: continue iface = parts[2] if len(parts) > 2 else '' if iface.startswith('br-') or iface == 'docker0': continue state = parts[-1] if state in ('FAILED', 'PERMANENT', 'NOARP', 'INCOMPLETE'): continue used_match = re.search(r'used\s+(\d+)/', line) used_secs = int(used_match.group(1)) if used_match else 0 if state != 'REACHABLE' and used_secs > ARP_MAX_AGE_SECS: continue idx = parts.index('lladdr') mac = parts[idx + 1].lower() if mac not in best or used_secs < best[mac][0]: best[mac] = (used_secs, {'ip': parts[0], 'state': 'REACHABLE'}) ARP_CACHE_FILE.write_text(json.dumps({m: e for m, (_, e) in best.items()})) except Exception as exc: print(f"WARNING: Could not refresh ARP cache: {exc}") # =================================================================== # Main # =================================================================== def main(): run_ddns() cfg = load_config() rotate_radius_log(cfg.get("radius", {})) refresh_arp_cache(cfg) subprocess.run([sys.executable, str(METRICS_SCRIPT), "--collect"]) if __name__ == "__main__": main()