Development

This commit is contained in:
Matthew Grotke 2026-06-10 10:06:13 -04:00
parent fff0835553
commit 19be151c70
7 changed files with 230 additions and 31 deletions

View file

@ -7,5 +7,9 @@ bp = Blueprint('accountlogout', __name__)
@bp.route('/action/accountlogout/logout', methods=['POST'])
@auth.require_level('viewer')
def logout():
sid = session.get('session_id', '')
if sid:
import config_utils as _cu
_cu.record_session_logout(sid)
session.clear()
return redirect('/overview')

View file

@ -6,7 +6,8 @@ from flask import session
APP_DIR = _os.path.dirname(_os.path.abspath(__file__))
CONFIGS_DIR = '/routlin_location'
WWW_DIR = '/www'
ACCOUNTS_FILE = f'{APP_DIR}/authorized_accounts.json'
ACCOUNTS_FILE = f'{CONFIGS_DIR}/.dashboard-accounts'
SESSIONS_DB = f'{CONFIGS_DIR}/.dashboard-sessions'
CONFIG_FILE = f'{CONFIGS_DIR}/config.json'
DASHBOARD_QUEUE = f'{CONFIGS_DIR}/.dashboard-queue'
DASHBOARD_DONE = f'{CONFIGS_DIR}/.dashboard-done'
@ -609,6 +610,72 @@ def revert_group_chain(group_uuid):
return errors, succeeded, failed
# Sessions DB =======================================================
def _open_sessions_db():
import sqlite3 as _sq, time as _t
con = _sq.connect(SESSIONS_DB, timeout=5)
con.execute('PRAGMA journal_mode=WAL')
con.executescript('''
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
email TEXT NOT NULL,
access_level TEXT NOT NULL DEFAULT '',
logged_in_at INTEGER NOT NULL,
last_seen INTEGER NOT NULL
);
''')
con.commit()
return con
def record_session_login(session_id, email, access_level):
import time as _t
try:
con = _open_sessions_db()
now = int(_t.time())
con.execute(
'INSERT OR REPLACE INTO sessions(session_id, email, access_level, logged_in_at, last_seen) VALUES(?,?,?,?,?)',
(session_id, email, access_level, now, now)
)
con.commit()
con.close()
except Exception:
pass
def record_session_logout(session_id):
try:
con = _open_sessions_db()
con.execute('DELETE FROM sessions WHERE session_id=?', (session_id,))
con.commit()
con.close()
except Exception:
pass
def record_session_activity(session_id):
import time as _t
try:
con = _open_sessions_db()
con.execute('UPDATE sessions SET last_seen=? WHERE session_id=?', (int(_t.time()), session_id))
con.commit()
con.close()
except Exception:
pass
def get_active_sessions():
import time as _t
cutoff = int(_t.time()) - 31 * 86400
try:
con = _open_sessions_db()
rows = con.execute(
'SELECT email, access_level, logged_in_at, last_seen FROM sessions WHERE last_seen > ? ORDER BY last_seen DESC',
(cutoff,)
).fetchall()
con.close()
return rows
except Exception:
return []
# Misc ==============================================================
def run_apply():

View file

@ -77,6 +77,9 @@ def serve_view(page_name):
view_req = view_def.get('client_requirement')
level = factory.client_level()
sid = session.get('session_id', '')
if sid and level > 0:
config_utils.record_session_activity(sid)
if not factory.passes(view_req, level):
return redirect('/overview' if level > 0 else '/accountlogin')

View file

@ -56,4 +56,10 @@ def form_login():
session['apply_changes_immediately'] = False
session.permanent = True
import uuid as _uuid
sid = str(_uuid.uuid4())
session['session_id'] = sid
import config_utils as _cu
_cu.record_session_login(sid, account['email_address'], account.get('access_level', 'viewer'))
return redirect('/overview')

View file

@ -66,6 +66,40 @@ def accounts_add():
return redirect(f'/{_PAGE}')
@bp.route('/action/accountmanage/accounts_edit', methods=['POST'])
@auth.require_level('manager')
def accounts_edit():
try:
row_index = int(request.form.get('row_index', ''))
except (ValueError, TypeError):
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
access_level = request.form.get('access_level', '').strip()
if access_level not in VALID_LEVELS:
flash('Invalid access level.', 'error')
return redirect(f'/{_PAGE}')
data = _load_accounts()
accounts = data.get('accounts', [])
if row_index < 0 or row_index >= len(accounts):
flash('Account not found.', 'error')
return redirect(f'/{_PAGE}')
target = accounts[row_index]
if target.get('email_address', '').lower() == session.get('email_address', '').lower():
flash('You cannot change your own access level.', 'error')
return redirect(f'/{_PAGE}')
accounts[row_index]['access_level'] = access_level
data['accounts'] = accounts
_save_accounts(data)
flash('Account updated.', 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/accountmanage/accounts_delete', methods=['POST'])
@auth.require_level('manager')
def accounts_delete():

View file

@ -11,38 +11,48 @@
]
},
{
"type": "table",
"datasource": "config:accounts",
"empty_message": "No accounts configured.",
"columns": [
"type": "card",
"label": "Active Sessions",
"items": [
{
"label": "Email Address",
"field": "email_address"
},
{
"label": "Access Level",
"field": "access_level"
},
{
"label": "Added By",
"field": "account_created_by"
},
{
"label": "Added",
"field": "account_created_utc"
},
{
"label": "Status",
"field": "account_status",
"render": "badge_active_inactive"
"type": "raw_html",
"html": "%ACTIVE_SESSIONS_TABLE%"
}
],
"row_actions": [
]
},
{
"type": "card",
"label": "User Accounts",
"items": [
{
"action": "/action/accountmanage/accounts_delete",
"method": "post",
"text": "Remove",
"class": "btn-danger btn-sm"
"type": "table",
"datasource": "config:accounts",
"empty_message": "No accounts configured.",
"columns": [
{"label": "Email Address", "field": "email_address"},
{"label": "Access Level", "field": "access_level"},
{"label": "Added By", "field": "account_created_by"},
{"label": "Added", "field": "account_created_utc"},
{
"label": "Status",
"field": "account_status",
"render": "badge_active_inactive"
}
],
"row_actions": [
{
"method": "js_edit",
"target": "edit-form",
"text": "Edit",
"class": "btn-ghost btn-sm"
},
{
"action": "/action/accountmanage/accounts_delete",
"method": "post",
"text": "Delete",
"class": "btn-danger btn-sm"
}
]
}
]
},
@ -84,6 +94,43 @@
]
}
]
},
{
"type": "card",
"id": "edit-form",
"label": "Edit Account",
"hidden": true,
"items": [
{
"type": "form",
"action": "/action/accountmanage/accounts_edit",
"method": "post",
"items": [
{"type": "hidden", "name": "row_index", "value": ""},
{
"type": "field",
"label": "Email Address",
"name": "email_address_display",
"input_type": "text",
"value": ""
},
{
"type": "field",
"label": "Access Level",
"name": "access_level",
"input_type": "select",
"options": "%ACCOUNT_LEVEL_OPTIONS%"
},
{
"type": "button_row",
"items": [
{"type": "button_primary", "text": "Save Changes"},
{"type": "button_cancel", "text": "Cancel"}
]
}
]
}
]
}
]
}
}

View file

@ -1,8 +1,45 @@
import json
import time
from datetime import datetime, timezone
import config_utils
import factory
def _fmt_ts(ts):
try:
dt = datetime.fromtimestamp(int(ts), tz=timezone.utc)
return dt.strftime('%Y-%m-%d %H:%M UTC')
except Exception:
return '-'
def _active_sessions_table():
rows = config_utils.get_active_sessions()
no_data = '<p class="text-muted" style="margin:0">No active sessions.</p>'
if not rows:
return no_data
now = int(time.time())
trs = ''
for email, access_level, logged_in_at, last_seen in rows:
ago = config_utils.relative_time(int(last_seen), now)
trs += (
f'<tr>'
f'<td class="table-cell">{factory.e(email)}</td>'
f'<td class="table-cell">{factory.e(access_level)}</td>'
f'<td class="table-cell">{_fmt_ts(logged_in_at)}</td>'
f'<td class="table-cell">{factory.e(ago)} ago</td>'
f'</tr>'
)
return (
'<table class="data-table"><thead><tr>'
'<th class="table-header">Email</th>'
'<th class="table-header">Access Level</th>'
'<th class="table-header">Logged In</th>'
'<th class="table-header">Last Seen</th>'
'</tr></thead><tbody>' + trs + '</tbody></table>'
)
def collect_tokens(cfg):
tokens = config_utils.collect_layout_tokens(cfg)
tokens['ACCOUNT_LEVEL_OPTIONS'] = json.dumps([
@ -10,6 +47,7 @@ def collect_tokens(cfg):
{'value': 'administrator', 'label': 'Administrator (can modify configuration)'},
{'value': 'manager', 'label': 'Manager (full access including account management)'},
])
tokens['ACTIVE_SESSIONS_TABLE'] = _active_sessions_table()
content = factory.load_json(f'{factory.PAGES_DIR}/accountmanage/content.json')
for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')