""" mod_dns_queries.py -- DNS query log collector. Reads dnsmasq query logs from journalctl using a cursor bookmark, parses query/result line pairs, and appends rows to a SQLite database. Called by: - maintenance.py on each timer tick - routlin-dash overview page on each page load (background thread) Only VLANs with dnsmasq_log_queries_days > 0 are collected. """ import json import re import sqlite3 import subprocess from collections import defaultdict, deque from pathlib import Path import mod_shared as shared import mod_validation as validation DB_FILE = shared.SCRIPT_DIR / ".dns-queries" QUERY_RE = re.compile(r'query\[(\w+)\] (\S+) from ([\d.]+)') BLOCK_RE = re.compile(r'(\S+) is 0\.0\.0\.0$') CACHED_RE = re.compile(r'cached (\S+) is ') FWD_RE = re.compile(r'forwarded (\S+) to ') REPLY_RE = re.compile(r'\breply (\S+) is ') LOCAL_RE = re.compile(r'/\S+ (\S+) is ') # =================================================================== # Database # =================================================================== def open_db(): con = sqlite3.connect(DB_FILE, timeout=10) con.execute('PRAGMA journal_mode=WAL') con.executescript(''' CREATE TABLE IF NOT EXISTS dns_queries ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts INTEGER NOT NULL, domain TEXT NOT NULL, qtype TEXT NOT NULL, client_ip TEXT NOT NULL, vlan TEXT NOT NULL, blocked INTEGER NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_dq_ts ON dns_queries(ts); CREATE INDEX IF NOT EXISTS idx_dq_domain ON dns_queries(domain, blocked); CREATE INDEX IF NOT EXISTS idx_dq_client ON dns_queries(client_ip); CREATE TABLE IF NOT EXISTS collector_state ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); ''') con.commit() return con def _get_cursor(con): row = con.execute("SELECT value FROM collector_state WHERE key='cursor'").fetchone() return row[0] if row else None def _save_cursor(con, cursor_val): con.execute( "INSERT OR REPLACE INTO collector_state(key, value) VALUES ('cursor', ?)", (cursor_val,) ) # =================================================================== # Collection # =================================================================== def collect(data): """ Fetch new dnsmasq log entries from journalctl since the last stored cursor, parse query/result pairs, and insert into dns_queries. Returns the number of rows inserted. """ unit_to_vlan = {} for vlan in data.get('vlans', []): if not vlan.get('dnsmasq_log_queries_days', 0): continue iface = validation.derive_interface(vlan, data) svc = shared.vlan_service_name(vlan, iface) unit_to_vlan[svc] = vlan['name'] unit_to_vlan[svc + '.service'] = vlan['name'] if not unit_to_vlan: return 0 con = open_db() journal_cursor = _get_cursor(con) cmd = ['journalctl', '-u', 'dnsmasq-routlin-*', '--no-pager', '-o', 'json'] if journal_cursor: cmd += ['--after-cursor', journal_cursor] result = subprocess.run(cmd, capture_output=True, text=True) # pending[domain] = deque of {ts, qtype, client_ip, vlan} # FIFO so concurrent same-domain queries from different clients pair correctly. pending = defaultdict(deque) rows = [] last_cursor = journal_cursor for line in result.stdout.splitlines(): try: entry = json.loads(line) except Exception: continue msg = entry.get('MESSAGE', '') if not isinstance(msg, str): continue raw_unit = entry.get('_SYSTEMD_UNIT', '') vlan_name = unit_to_vlan.get(raw_unit) or unit_to_vlan.get(raw_unit.removesuffix('.service')) jcursor = entry.get('__CURSOR', '') ts = int(entry.get('__REALTIME_TIMESTAMP', 0)) // 1_000_000 if vlan_name: m = QUERY_RE.search(msg) if m: # Incoming query line -- push to pending, wait for result line pending[m.group(2)].append({ 'ts': ts, 'qtype': m.group(1), 'client_ip': m.group(3), 'vlan': vlan_name, }) else: # Result line -- identify domain and whether it was blocked domain = None blocked = 0 bm = BLOCK_RE.search(msg) if bm: domain = bm.group(1) blocked = 1 else: for pat in (CACHED_RE, FWD_RE, REPLY_RE, LOCAL_RE): pm = pat.search(msg) if pm: domain = pm.group(1) break if domain and pending.get(domain): p = pending[domain].popleft() if not pending[domain]: del pending[domain] rows.append((p['ts'], domain, p['qtype'], p['client_ip'], p['vlan'], blocked)) if jcursor: last_cursor = jcursor # Flush any pending entries that never received a result line. # This can happen when the collector runs mid-transaction. We # record them as not-blocked since if they were blocked dnsmasq # would have answered synchronously and the result line would be # in the same journal batch. for domain, q in pending.items(): for p in q: rows.append((p['ts'], domain, p['qtype'], p['client_ip'], p['vlan'], 0)) if rows: con.executemany( 'INSERT INTO dns_queries(ts, domain, qtype, client_ip, vlan, blocked)' ' VALUES(?,?,?,?,?,?)', rows ) if last_cursor and last_cursor != journal_cursor: _save_cursor(con, last_cursor) con.commit() shared.chown_to_script_dir_owner(DB_FILE) con.close() return len(rows) def prune(data): """ Delete dns_queries rows older than the retention period configured per VLAN. Uses the maximum retention days across all logging-enabled VLANs. Returns the number of rows deleted. """ days = max( (v.get('dnsmasq_log_queries_days', 0) for v in data.get('vlans', [])), default=0 ) if not days: return 0 if not DB_FILE.exists(): return 0 cutoff = int(__import__('time').time()) - days * 86400 con = open_db() cur = con.execute('DELETE FROM dns_queries WHERE ts < ?', (cutoff,)) deleted = cur.rowcount con.commit() con.close() return deleted