import json import sqlite3 import time import uuid from flask.sessions import SessionInterface, SessionMixin from werkzeug.datastructures import CallbackDict class SqliteSession(CallbackDict, SessionMixin): def __init__(self, initial=None, sid=None, new=False): def on_update(self): self.modified = True CallbackDict.__init__(self, initial or {}, on_update) self.sid = sid self.new = new self.modified = False class SqliteSessionInterface(SessionInterface): def __init__(self, db_path): self.db_path = db_path self._init_db() def _connect(self): con = sqlite3.connect(self.db_path, timeout=5) con.execute('PRAGMA journal_mode=WAL') return con def _init_db(self): try: con = self._connect() con.execute(''' CREATE TABLE IF NOT EXISTS sessions ( session_id TEXT PRIMARY KEY, email TEXT NOT NULL DEFAULT '', access_level TEXT NOT NULL DEFAULT '', data_json TEXT NOT NULL DEFAULT '{}', created_at INTEGER NOT NULL, last_seen INTEGER NOT NULL ) ''') con.commit() con.close() except Exception: pass def open_session(self, app, request): name = app.config.get('SESSION_COOKIE_NAME', 'session') sid = request.cookies.get(name) if sid: try: con = self._connect() row = con.execute( 'SELECT data_json FROM sessions WHERE session_id=?', (sid,) ).fetchone() con.close() if row: return SqliteSession(json.loads(row[0]), sid=sid, new=False) except Exception: pass return SqliteSession(sid=str(uuid.uuid4()), new=True) def save_session(self, app, session, response): name = app.config.get('SESSION_COOKIE_NAME', 'session') domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) if not session: if not session.new: try: con = self._connect() con.execute('DELETE FROM sessions WHERE session_id=?', (session.sid,)) con.commit() con.close() except Exception: pass response.delete_cookie(name, domain=domain, path=path) return now = int(time.time()) email = session.get('email_address', '') level = session.get('access_level', '') try: con = self._connect() if session.new: if not session.modified: con.close() return con.execute( 'INSERT INTO sessions(session_id,email,access_level,data_json,created_at,last_seen)' ' VALUES(?,?,?,?,?,?)', (session.sid, email, level, json.dumps(dict(session)), now, now) ) elif session.modified: con.execute( 'UPDATE sessions SET email=?,access_level=?,data_json=?,last_seen=? WHERE session_id=?', (email, level, json.dumps(dict(session)), now, session.sid) ) else: con.execute( 'UPDATE sessions SET last_seen=? WHERE session_id=?', (now, session.sid) ) con.commit() con.close() except Exception: pass response.set_cookie( name, session.sid, expires=self.get_expiration_time(app, session), httponly=self.get_cookie_httponly(app), domain=domain, path=path, secure=self.get_cookie_secure(app), samesite=self.get_cookie_samesite(app), )