Development

This commit is contained in:
Matthew Grotke 2026-06-10 14:23:47 -04:00
parent f5722f3c7b
commit d60bf15ce4
15 changed files with 367 additions and 285 deletions

View file

@ -1,6 +1,6 @@
from pathlib import Path
from flask import Blueprint, request, session, redirect, flash
import json, os, re, sqlite3
import os, re, sqlite3
from datetime import datetime, timezone
import auth
import config_utils
@ -10,24 +10,31 @@ _PAGE = Path(__file__).parent.name
bp = Blueprint(_PAGE, __name__)
VALID_LEVELS = {'viewer', 'administrator', 'manager'}
VALID_LEVELS = {'viewer': 1, 'administrator': 2, 'manager': 3}
def _load_accounts():
@bp.route('/action/accountmanage/session_invalidate', methods=['POST'])
@auth.require_level('manager')
def session_invalidate():
sid = request.form.get('session_id', '').strip()
if not sid:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
try:
with open(config_utils.ACCOUNTS_FILE) as f:
return json.load(f)
con = config_utils.open_accounts_db()
con.execute('DELETE FROM sessions WHERE session_id=?', (sid,))
con.commit()
con.close()
flash('Session invalidated.', 'success')
except Exception:
return {'accounts': []}
def _save_accounts(data):
with open(config_utils.ACCOUNTS_FILE, 'w') as f:
json.dump(data, f, indent=2)
flash('Failed to invalidate session.', 'error')
return redirect(f'/{_PAGE}')
@bp.route('/action/accountmanage/accounts_add', methods=['POST'])
@auth.require_level('manager')
def accounts_add():
import uuid as _uuid, time as _t
email = sanitize.email(request.form.get('email_address', ''))
access_level = request.form.get('access_level', '').strip()
@ -43,26 +50,24 @@ def accounts_add():
flash('Invalid access level.', 'error')
return redirect(f'/{_PAGE}')
data = _load_accounts()
accounts = data.get('accounts', [])
if any(a.get('email_address', '').lower() == email for a in accounts):
if config_utils.get_account_by_email(email):
flash('An account with that email address already exists.', 'error')
return redirect(f'/{_PAGE}')
now = datetime.now(tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
accounts.append({
'email_address': email,
'access_level': access_level,
'account_created_utc': now,
'account_created_by': session.get('email_address', ''),
'hashed_password': '',
'timezone': '',
})
data['accounts'] = accounts
_save_accounts(data)
try:
con = config_utils.open_accounts_db()
con.execute(
'INSERT INTO accounts(account_id,email,access_level,created_ts,created_by) VALUES(?,?,?,?,?)',
(str(_uuid.uuid4()), email, VALID_LEVELS[access_level], int(_t.time()),
session.get('email_address', ''))
)
con.commit()
con.close()
except Exception as exc:
flash(f'Could not add account: {exc}', 'error')
return redirect(f'/{_PAGE}')
flash(f'Authorization added for {email}. User must complete account setup via the Create Account page.', 'success')
flash(f'Authorization added for {email}.', 'success')
return redirect(f'/{_PAGE}')
@ -80,9 +85,7 @@ def accounts_edit():
flash('Invalid access level.', 'error')
return redirect(f'/{_PAGE}')
data = _load_accounts()
accounts = data.get('accounts', [])
accounts = config_utils.list_accounts()
if row_index < 0 or row_index >= len(accounts):
flash('Account not found.', 'error')
return redirect(f'/{_PAGE}')
@ -92,29 +95,19 @@ def accounts_edit():
flash('You cannot change your own access level.', 'error')
return redirect(f'/{_PAGE}')
accounts[row_index]['access_level'] = access_level
data['accounts'] = accounts
_save_accounts(data)
flash('Account updated.', 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/accountmanage/session_invalidate', methods=['POST'])
@auth.require_level('manager')
def session_invalidate():
sid = request.form.get('session_id', '').strip()
if not sid:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
try:
con = sqlite3.connect(config_utils.SESSIONS_DB, timeout=5)
con.execute('DELETE FROM sessions WHERE session_id=?', (sid,))
con = config_utils.open_accounts_db()
con.execute(
'UPDATE accounts SET access_level=? WHERE account_id=?',
(VALID_LEVELS[access_level], target['account_id'])
)
con.commit()
con.close()
flash('Session invalidated.', 'success')
except Exception:
flash('Failed to invalidate session.', 'error')
except Exception as exc:
flash(f'Could not update account: {exc}', 'error')
return redirect(f'/{_PAGE}')
flash('Account updated.', 'success')
return redirect(f'/{_PAGE}')
@ -127,26 +120,29 @@ def accounts_delete():
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
data = _load_accounts()
accounts = data.get('accounts', [])
accounts = config_utils.list_accounts()
if row_index < 0 or row_index >= len(accounts):
flash('Account not found.', 'error')
return redirect(f'/{_PAGE}')
target = accounts[row_index]
target = accounts[row_index]
target_email = target.get('email_address', '').lower()
current_email = session.get('email_address', '').lower()
initial_email = os.environ.get('INITIAL_MANAGER_EMAIL', '').strip().lower()
if target_email == current_email and target_email != initial_email:
flash('You cannot remove your own account.', 'error')
return redirect(f'/{_PAGE}')
removed_email = target.get('email_address', '')
accounts.pop(row_index)
data['accounts'] = accounts
_save_accounts(data)
try:
con = config_utils.open_accounts_db()
con.execute('DELETE FROM sessions WHERE account_id=?', (target['account_id'],))
con.execute('DELETE FROM accounts WHERE account_id=?', (target['account_id'],))
con.commit()
con.close()
except Exception as exc:
flash(f'Could not delete account: {exc}', 'error')
return redirect(f'/{_PAGE}')
flash(f'Account for {removed_email} has been removed.', 'success')
flash(f'Account for {target["email_address"]} has been removed.', 'success')
return redirect(f'/{_PAGE}')

View file

@ -32,7 +32,7 @@
{"label": "Email Address", "field": "email_address"},
{"label": "Access Level", "field": "access_level"},
{"label": "Added By", "field": "account_created_by"},
{"label": "Added", "field": "account_created_utc"},
{"label": "Added", "field": "account_created_ts"},
{
"label": "Status",
"field": "account_status",

View file

@ -1,25 +1,33 @@
import json
import sqlite3
import time
from datetime import datetime, timezone
from datetime import datetime
import config_utils
import factory
def _fmt_ts(ts):
def _fmt_ts(ts, now):
try:
dt = datetime.fromtimestamp(int(ts), tz=timezone.utc)
return dt.strftime('%Y-%m-%d %H:%M UTC')
dt = datetime.fromtimestamp(int(ts))
ago = config_utils.relative_time(int(ts), now)
return f'{dt.strftime("%Y-%m-%d %H:%M")} ({ago} ago)'
except Exception:
return '-'
_LEVEL_INT_TO_STR = {0: 'nothing', 1: 'viewer', 2: 'administrator', 3: 'manager'}
def _active_sessions_table():
try:
con = sqlite3.connect(config_utils.SESSIONS_DB, timeout=5)
con = sqlite3.connect(config_utils.ACCOUNTS_DB, timeout=5)
con.row_factory = sqlite3.Row
rows = con.execute(
'SELECT session_id, email, access_level, created_at, last_seen'
' FROM sessions ORDER BY last_seen DESC'
'''SELECT s.session_id, a.email, a.access_level,
s.session_started_ts, s.last_seen_ts
FROM sessions s
JOIN accounts a ON a.account_id = s.account_id
ORDER BY s.last_seen_ts DESC'''
).fetchall()
con.close()
except Exception:
@ -30,12 +38,21 @@ def _active_sessions_table():
now = int(time.time())
trs = ''
for sid, email, access_level, created_at, last_seen in rows:
online = (now - int(last_seen)) < 300
badge = (
'<span class="badge badge-enabled">Online</span>'
if online else
'<span class="badge badge-disabled">Offline</span>'
for row in rows:
sid = row['session_id']
email = row['email']
access_level = _LEVEL_INT_TO_STR.get(row['access_level'], 'viewer')
started_ts = row['session_started_ts']
last_seen = row['last_seen_ts']
online = (now - int(last_seen)) < 300
ago = config_utils.relative_time(int(last_seen), now)
tip = factory.e(f'Last seen {ago} ago')
badge_cls = 'badge-enabled' if online else 'badge-disabled'
badge_lbl = 'Online' if online else 'Offline'
badge = (
f'<span class="tooltip-wrap" data-tooltip="{tip}">'
f'<span class="badge {badge_cls}">{badge_lbl}</span>'
f'</span>'
)
btn = (
f'<form method="post" action="/action/accountmanage/session_invalidate"'
@ -49,8 +66,7 @@ def _active_sessions_table():
f'<td class="table-cell">{factory.e(email)}</td>'
f'<td class="table-cell">{factory.e(access_level)}</td>'
f'<td class="table-cell">{badge}</td>'
f'<td class="table-cell">{_fmt_ts(created_at)}</td>'
f'<td class="table-cell">{_fmt_ts(last_seen)}</td>'
f'<td class="table-cell">{_fmt_ts(started_ts, now)}</td>'
f'<td class="table-cell">{btn}</td>'
f'</tr>'
)
@ -60,7 +76,6 @@ def _active_sessions_table():
'<th class="table-header">Access Level</th>'
'<th class="table-header">Status</th>'
'<th class="table-header">Logged In</th>'
'<th class="table-header">Last Seen</th>'
'<th class="table-header"></th>'
'</tr></thead><tbody>' + trs + '</tbody></table>'
)