linuxrouter/routlin/metrics.py
2026-06-11 01:31:57 -04:00

517 lines
18 KiB
Python

#!/usr/bin/env python3
"""
metrics.py -- DNS metrics collection and display.
Collects DNS stats by sending SIGUSR1 to dnsmasq instances, parses output
from journalctl, and stores daily-aggregated totals in a SQLite database.
Also collects per-query DNS logs from journalctl and stores them for the
dashboard DNS Queries page.
Usage:
python3 metrics.py --collect Collect and store metrics (run by maintenance timer)
python3 metrics.py --view Display all-time metrics summary
"""
import json
import os
import re
import signal
import sqlite3
import subprocess
import sys
import time
from collections import defaultdict, deque
from datetime import date
from pathlib import Path
import mod_shared as shared
import mod_validation as validation
SCRIPT_DIR = Path(__file__).parent
CONFIG_FILE = SCRIPT_DIR / "config.json"
METRICS_DB_FILE = shared.SCRIPT_DIR / ".dns-metrics"
QUERIES_DB_FILE = shared.SCRIPT_DIR / ".dns-queries"
# ===================================================================
# Config
# ===================================================================
def load_config():
if not CONFIG_FILE.exists():
print(f"ERROR: Config file not found: {CONFIG_FILE}", file=sys.stderr)
sys.exit(1)
with open(CONFIG_FILE) as f:
return json.load(f)
# ===================================================================
# Metrics database
# ===================================================================
def open_metrics_db():
con = sqlite3.connect(METRICS_DB_FILE, timeout=10)
con.execute('PRAGMA journal_mode=WAL')
con.executescript('''
CREATE TABLE IF NOT EXISTS daily_totals (
date TEXT PRIMARY KEY,
last_updated INTEGER,
queries_forwarded INTEGER NOT NULL DEFAULT 0,
queries_answered_locally INTEGER NOT NULL DEFAULT 0,
queries_authoritative INTEGER NOT NULL DEFAULT 0,
cache_reused INTEGER NOT NULL DEFAULT 0,
tcp_hwm INTEGER NOT NULL DEFAULT 0,
tcp_max_allowed INTEGER NOT NULL DEFAULT 0,
pool_memory_max INTEGER NOT NULL DEFAULT 0,
dnssec_subqueries_hwm INTEGER NOT NULL DEFAULT 0,
dnssec_crypto_hwm INTEGER NOT NULL DEFAULT 0,
dnssec_sig_fails_hwm INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS daily_servers (
date TEXT NOT NULL,
address TEXT NOT NULL,
queries_sent INTEGER NOT NULL DEFAULT 0,
retried INTEGER NOT NULL DEFAULT 0,
failed INTEGER NOT NULL DEFAULT 0,
nxdomain INTEGER NOT NULL DEFAULT 0,
avg_latency_ms INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (date, address)
);
''')
con.commit()
return con
# ===================================================================
# Queries database
# ===================================================================
QUERY_RE = re.compile(r'query\[(\w+)\] (\S+) from ([\d.]+)')
BLOCK_RE = re.compile(r'(\S+) is 0\.0\.0\.0$')
CACHED_RE = re.compile(r'cached (\S+) is ')
FWD_RE = re.compile(r'forwarded (\S+) to ')
REPLY_RE = re.compile(r'\breply (\S+) is ')
LOCAL_RE = re.compile(r'/\S+ (\S+) is ')
def open_queries_db():
con = sqlite3.connect(QUERIES_DB_FILE, timeout=10)
con.execute('PRAGMA journal_mode=WAL')
con.executescript('''
CREATE TABLE IF NOT EXISTS dns_queries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
domain TEXT NOT NULL,
qtype TEXT NOT NULL,
client_ip TEXT NOT NULL,
vlan TEXT NOT NULL,
blocked INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_dq_ts ON dns_queries(ts);
CREATE INDEX IF NOT EXISTS idx_dq_domain ON dns_queries(domain, blocked);
CREATE INDEX IF NOT EXISTS idx_dq_client ON dns_queries(client_ip);
CREATE TABLE IF NOT EXISTS collector_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
''')
con.commit()
return con
def _get_cursor(con):
row = con.execute("SELECT value FROM collector_state WHERE key='cursor'").fetchone()
return row[0] if row else None
def _save_cursor(con, cursor_val):
con.execute(
"INSERT OR REPLACE INTO collector_state(key, value) VALUES ('cursor', ?)",
(cursor_val,)
)
# ===================================================================
# Collect metrics
# ===================================================================
def collect_metrics(data):
"""
Send SIGUSR1 to each running dnsmasq instance and parse stats from
journalctl. Returns a combined metrics dict, or None if unavailable.
"""
m = {
"queries_forwarded": 0,
"queries_answered_locally": 0,
"queries_authoritative": 0,
"cache_reused": 0,
"tcp_hwm": 0,
"tcp_max_allowed": 0,
"pool_memory_max": 0,
"dnssec_subqueries_hwm": 0,
"dnssec_crypto_hwm": 0,
"dnssec_sig_fails_hwm": 0,
"servers": []
}
t_signal = int(time.time())
any_running = False
for vlan in data["vlans"]:
pid_file = shared.vlan_pid_file(vlan)
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, signal.SIGUSR1)
any_running = True
except Exception:
continue
if not any_running:
print("No dnsmasq instances are running.")
return None
time.sleep(2)
server_map = {}
for vlan in data["vlans"]:
svc = shared.vlan_service_name(vlan, validation.derive_interface(vlan, data))
result = subprocess.run(
["journalctl", "-u", svc, f"--since=@{t_signal}", "--no-pager", "-o", "cat"],
capture_output=True, text=True
)
for line in result.stdout.splitlines():
r = re.search(r"cache size \d+, (\d+)/\d+ cache insertions re-used", line)
if r:
m["cache_reused"] += int(r.group(1))
r = re.search(r"queries forwarded (\d+), queries answered locally (\d+)", line)
if r:
m["queries_forwarded"] += int(r.group(1))
m["queries_answered_locally"] += int(r.group(2))
r = re.search(r"queries for authoritative zones (\d+)", line)
if r:
m["queries_authoritative"] += int(r.group(1))
r = re.search(r"highest since last SIGUSR1 (\d+), max allowed (\d+)", line)
if r:
m["tcp_hwm"] = max(m["tcp_hwm"], int(r.group(1)))
m["tcp_max_allowed"] = max(m["tcp_max_allowed"], int(r.group(2)))
r = re.search(r"pool memory in use \d+, max (\d+)", line)
if r:
m["pool_memory_max"] = max(m["pool_memory_max"], int(r.group(1)))
r = re.search(
r"server (\S+): queries sent (\d+), retried (\d+), failed (\d+), "
r"nxdomain replies (\d+), avg\. latency (\d+)ms",
line
)
if r:
addr = r.group(1)
if addr not in server_map:
server_map[addr] = {
"address": addr, "queries_sent": 0, "retried": 0,
"failed": 0, "nxdomain": 0, "avg_latency_ms": 0
}
server_map[addr]["queries_sent"] += int(r.group(2))
server_map[addr]["retried"] += int(r.group(3))
server_map[addr]["failed"] += int(r.group(4))
server_map[addr]["nxdomain"] += int(r.group(5))
if int(r.group(6)) > 0:
server_map[addr]["avg_latency_ms"] = int(r.group(6))
m["servers"] = list(server_map.values())
return m
def store_metrics(new_metrics):
today = date.today().isoformat()
con = open_metrics_db()
con.execute('''
INSERT INTO daily_totals(
date, last_updated,
queries_forwarded, queries_answered_locally, queries_authoritative,
cache_reused, tcp_hwm, tcp_max_allowed, pool_memory_max,
dnssec_subqueries_hwm, dnssec_crypto_hwm, dnssec_sig_fails_hwm
) VALUES (?,strftime('%s','now'),?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(date) DO UPDATE SET
last_updated = strftime('%s','now'),
queries_forwarded = queries_forwarded + excluded.queries_forwarded,
queries_answered_locally = queries_answered_locally + excluded.queries_answered_locally,
queries_authoritative = queries_authoritative + excluded.queries_authoritative,
cache_reused = cache_reused + excluded.cache_reused,
tcp_hwm = MAX(tcp_hwm, excluded.tcp_hwm),
tcp_max_allowed = CASE WHEN excluded.tcp_max_allowed > 0
THEN excluded.tcp_max_allowed ELSE tcp_max_allowed END,
pool_memory_max = MAX(pool_memory_max, excluded.pool_memory_max),
dnssec_subqueries_hwm = MAX(dnssec_subqueries_hwm, excluded.dnssec_subqueries_hwm),
dnssec_crypto_hwm = MAX(dnssec_crypto_hwm, excluded.dnssec_crypto_hwm),
dnssec_sig_fails_hwm = MAX(dnssec_sig_fails_hwm, excluded.dnssec_sig_fails_hwm)
''', (
today,
new_metrics["queries_forwarded"],
new_metrics["queries_answered_locally"],
new_metrics["queries_authoritative"],
new_metrics["cache_reused"],
new_metrics["tcp_hwm"],
new_metrics["tcp_max_allowed"],
new_metrics["pool_memory_max"],
new_metrics["dnssec_subqueries_hwm"],
new_metrics["dnssec_crypto_hwm"],
new_metrics["dnssec_sig_fails_hwm"],
))
for srv in new_metrics["servers"]:
con.execute('''
INSERT INTO daily_servers(date, address, queries_sent, retried, failed, nxdomain, avg_latency_ms)
VALUES (?,?,?,?,?,?,?)
ON CONFLICT(date, address) DO UPDATE SET
queries_sent = queries_sent + excluded.queries_sent,
retried = retried + excluded.retried,
failed = failed + excluded.failed,
nxdomain = nxdomain + excluded.nxdomain,
avg_latency_ms = CASE WHEN excluded.avg_latency_ms > 0
THEN excluded.avg_latency_ms
ELSE avg_latency_ms END
''', (
today, srv["address"],
srv["queries_sent"], srv["retried"], srv["failed"],
srv["nxdomain"], srv["avg_latency_ms"],
))
con.commit()
shared.chown_to_script_dir_owner(METRICS_DB_FILE)
con.close()
# ===================================================================
# Collect DNS queries
# ===================================================================
def collect_queries(data):
"""
Fetch new dnsmasq query log entries from journalctl since the last
stored cursor, parse query/result pairs, and insert into dns_queries.
Returns the number of rows inserted.
"""
unit_to_vlan = {}
for vlan in data.get('vlans', []):
if not vlan.get('dnsmasq_log_queries_days', 0):
continue
iface = validation.derive_interface(vlan, data)
svc = shared.vlan_service_name(vlan, iface)
unit_to_vlan[svc] = vlan['name']
unit_to_vlan[svc + '.service'] = vlan['name']
if not unit_to_vlan:
return 0
con = open_queries_db()
journal_cursor = _get_cursor(con)
cmd = ['journalctl', '-u', 'dnsmasq-routlin-*', '--no-pager', '-o', 'json']
if journal_cursor:
cmd += ['--after-cursor', journal_cursor]
result = subprocess.run(cmd, capture_output=True, text=True)
pending = defaultdict(deque)
rows = []
last_cursor = journal_cursor
for line in result.stdout.splitlines():
try:
entry = json.loads(line)
except Exception:
continue
msg = entry.get('MESSAGE', '')
if not isinstance(msg, str):
continue
raw_unit = entry.get('_SYSTEMD_UNIT', '')
vlan_name = unit_to_vlan.get(raw_unit) or unit_to_vlan.get(raw_unit.removesuffix('.service'))
jcursor = entry.get('__CURSOR', '')
ts = int(entry.get('__REALTIME_TIMESTAMP', 0)) // 1_000_000
if vlan_name:
qm = QUERY_RE.search(msg)
if qm:
pending[qm.group(2)].append({
'ts': ts, 'qtype': qm.group(1),
'client_ip': qm.group(3), 'vlan': vlan_name,
})
else:
domain = None
blocked = 0
bm = BLOCK_RE.search(msg)
if bm:
domain = bm.group(1)
blocked = 1
else:
for pat in (CACHED_RE, FWD_RE, REPLY_RE, LOCAL_RE):
pm = pat.search(msg)
if pm:
domain = pm.group(1)
break
if domain and pending.get(domain):
p = pending[domain].popleft()
if not pending[domain]:
del pending[domain]
rows.append((p['ts'], domain, p['qtype'], p['client_ip'], p['vlan'], blocked))
if jcursor:
last_cursor = jcursor
for domain, q in pending.items():
for p in q:
rows.append((p['ts'], domain, p['qtype'], p['client_ip'], p['vlan'], 0))
if rows:
con.executemany(
'INSERT INTO dns_queries(ts, domain, qtype, client_ip, vlan, blocked)'
' VALUES(?,?,?,?,?,?)',
rows
)
if last_cursor and last_cursor != journal_cursor:
_save_cursor(con, last_cursor)
con.commit()
shared.chown_to_script_dir_owner(QUERIES_DB_FILE)
con.close()
return len(rows)
def prune_queries(data):
"""
Delete dns_queries rows older than the retention period configured per VLAN.
Returns the number of rows deleted.
"""
days = max(
(v.get('dnsmasq_log_queries_days', 0) for v in data.get('vlans', [])),
default=0
)
if not days or not QUERIES_DB_FILE.exists():
return 0
cutoff = int(time.time()) - days * 86400
con = open_queries_db()
cur = con.execute('DELETE FROM dns_queries WHERE ts < ?', (cutoff,))
deleted = cur.rowcount
con.commit()
con.close()
return deleted
# ===================================================================
# Display
# ===================================================================
def show_metrics(data):
new = collect_metrics(data)
if new is None:
return
store_metrics(new)
con = open_metrics_db()
row = con.execute('''
SELECT
MIN(date), MAX(date), COUNT(*),
SUM(queries_forwarded), SUM(queries_answered_locally),
SUM(queries_authoritative), SUM(cache_reused),
MAX(tcp_hwm), MAX(tcp_max_allowed), MAX(pool_memory_max)
FROM daily_totals
''').fetchone()
servers = con.execute('''
SELECT
ds.address,
SUM(ds.queries_sent),
SUM(ds.retried),
SUM(ds.failed),
SUM(ds.nxdomain),
(SELECT avg_latency_ms FROM daily_servers d2
WHERE d2.address = ds.address AND d2.avg_latency_ms > 0
ORDER BY d2.date DESC LIMIT 1)
FROM daily_servers ds
GROUP BY ds.address
ORDER BY SUM(ds.queries_sent) DESC
''').fetchall()
con.close()
first, last, days, fwd, local, auth, reused, tcp_hwm, tcp_max, pool = row
print("DNS Metrics (all-time totals across all VLAN instances)")
print(f" First recorded : {first or '-'}")
print(f" Last recorded : {last or '-'}")
print(f" Days tracked : {days or 0}")
print()
print("Queries")
print(f" Forwarded to upstream : {(fwd or 0):,}")
print(f" Answered from cache : {(local or 0):,}")
print(f" Authoritative : {(auth or 0):,}")
print(f" Cache reused : {(reused or 0):,}")
print()
print("TCP")
print(f" Peak concurrent (HWM) : {tcp_hwm or 0}")
print(f" Max allowed : {tcp_max or 0}")
print()
print(f"Pool memory peak : {pool or 0} bytes")
if servers:
print()
print("Upstream servers (all-time)")
for addr, sent, retried, failed, nxdomain, latency in servers:
print(f" {addr}")
print(f" Sent : {(sent or 0):,}")
print(f" Retried : {(retried or 0):,}")
print(f" Failed : {(failed or 0):,}")
print(f" NXDOMAIN : {(nxdomain or 0):,}")
print(f" Latency : {latency}ms (last recorded)" if latency else " Latency : -")
# ===================================================================
# Main
# ===================================================================
def main():
import argparse
parser = argparse.ArgumentParser(
description="DNS metrics collection and display",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"examples:\n"
" python3 metrics.py --collect Collect and store metrics (run by maintenance timer)\n"
" python3 metrics.py --view Display all-time metrics summary\n"
)
)
parser.add_argument("--collect", action="store_true", help="Collect and store metrics")
parser.add_argument("--view", action="store_true", help="Display all-time metrics summary")
args = parser.parse_args()
if not any([args.collect, args.view]):
parser.print_help()
return
data = load_config()
if args.view:
show_metrics(data)
return
new = collect_metrics(data)
if new:
store_metrics(new)
inserted = collect_queries(data)
if inserted:
print(f"DNS query collector: inserted {inserted} new rows.")
pruned = prune_queries(data)
if pruned:
print(f"DNS query collector: pruned {pruned} old rows.")
if __name__ == "__main__":
main()