from contextlib import closing from datetime import datetime, timedelta from flask import current_app, g from flask_login import UserMixin, current_user from .model.read_mails import QMailAuthuser EXPIRATION_SEC = 60 * 60 * 25 class JWebmailUser(UserMixin): def __init__(self, mail_addr, password): self.id = mail_addr self.password = password class RedisTimeoutSession: def __init__(self, username, passwd, timeout, port=6379): import redis self.timeout = timeout self.conn = redis.Redis( host="localhost", port=port, decode_responses=True, protocol=3, username=username, password=passwd, ) def set(self, key, value): self.conn.setex(f"jwm:user:{key}", self.timeout, value) def get(self, key): return self.conn.getex(f"jwm:user:{key}", self.timeout) def close(self): self.conn.close() class MysqlTimeoutSession: def __init__(self, username, passwd, timeout, database="jwebmaildb1", port=3306): import mysql.connector self.timeout = timeout self.conn = mysql.connector.connect( host="localhost", port=port, username=username, password=passwd, database=database, ) def set(self, key, value): timeout = datetime.now() + timedelta(seconds=self.timeout) with closing(self.conn.cursor()) as cur: cur.execute("DELETE FROM session WHERE user = %s", [key]) cur.execute( "INSERT INTO session VALUES (%s, %s, %s)", [key, value, timeout] ) self.conn.commit() def get(self, key): with closing(self.conn.cursor()) as cur: cur.execute("DELETE FROM session WHERE timeout < NOW()") cur.execute("SELECT password FROM session WHERE user = %s", [key]) row = cur.fetchone() if row is None: self.conn.commit() return None else: timeout = datetime.now() + timedelta(seconds=self.timeout) cur.execute( "UPDATE session SET timeout = %s WHERE user = %s", [timeout, key] ) self.conn.commit() return row[0] def close(self): self.conn.close() class SqliteTimeoutSession: def __init__(self, _username, _passwd, timeout, database): import sqlite3 self.timeout = timeout self.conn = sqlite3.connect(database) cur = self.conn.cursor() cur.execute( "CREATE TABLE IF NOT EXISTS session (user text PRIMARY KEY, password text, timeout integer NOT NULL) STRICT" ) cur.execute("CREATE INDEX IF NOT EXISTS timeout_idx ON session (timeout)") def set(self, key, value): timeout = datetime.now() + timedelta(seconds=self.timeout) with closing(self.conn.cursor()) as cur: cur.execute( "REPLACE INTO session VALUES (?, ?, unixepoch(?))", [key, value, timeout], ) self.conn.commit() def get(self, key): with closing(self.conn.cursor()) as cur: cur.execute("DELETE FROM session WHERE timeout < unixepoch()") cur.execute("SELECT password FROM session WHERE user = ?", [key]) row = cur.fetchone() if row is None: self.conn.commit() return None else: timeout = datetime.now() + timedelta(seconds=self.timeout) cur.execute( "UPDATE session SET timeout = unixepoch(?) WHERE user = ?", [timeout, key], ) self.conn.commit() return row[0] def close(self): self.conn.close() def select_timeout_session(): session_type = current_app.config["JWEBMAIL"]["READ_MAILS"]["SESSION_TYPE"] user = "jwebmail" passwd = current_app.config["JWEBMAIL"]["READ_MAILS"]["SESSION_STORE_PASSWD"] args = dict() db_name = current_app.config["JWEBMAIL"]["READ_MAILS"].get("SESSION_STORE_DB_NAME") if db_name: args["database"] = db_name if session_type == "REDIS": return RedisTimeoutSession(user, passwd, EXPIRATION_SEC) elif session_type == "MYSQL": return MysqlTimeoutSession(user, passwd, EXPIRATION_SEC, **args) elif session_type == "SQLITE": args.setdefault("database", "/var/local/lib/jwebmail/jwebmail.sqlite3") return SqliteTimeoutSession(user, passwd, EXPIRATION_SEC, **args) else: raise ValueError(f"unknown session_type {session_type!r}") def build_qma(username, password): authenticator = current_app.config["JWEBMAIL"]["READ_MAILS"]["AUTHENTICATOR"] backend = current_app.config["JWEBMAIL"]["READ_MAILS"]["BACKEND"] mailbox = current_app.config["JWEBMAIL"]["READ_MAILS"]["MAILBOX"] mailbox_user = current_app.config["JWEBMAIL"]["READ_MAILS"]["MAILBOX_USER"] return QMailAuthuser( username, password, backend, mailbox, mailbox_user, authenticator ) def login(username, password): return build_qma(username, password).verify_user() def add_user(user: JWebmailUser): r = select_timeout_session() r.set(user.get_id(), user.password) r.close() def load_user(username: str) -> JWebmailUser: r = select_timeout_session() passwd = r.get(username) r.close() if passwd is None: return None return JWebmailUser(username, passwd) def get_read_mails_logged_in(): if "read_mails" in g: return g.read_mails qma = build_qma(current_user.get_id(), current_user.password) g.read_mails = qma return qma