from contextlib import closing from datetime import datetime, timedelta from flask import current_app, g from flask_login import UserMixin, current_user from .model.read_mails import QMailAuthuser EXPIRATION_SEC = 60 * 60 * 25 class JWebmailUser(UserMixin): def __init__(self, mail_addr, password): self.id = mail_addr self.password = password class RedisTimeoutSession: def __init__(self, username, passwd, timeout, port=6379): import redis self.timeout = timeout self.conn = redis.Redis( host="localhost", port=port, decode_responses=True, protocol=3, username=username, password=passwd, ) def set(self, key, value): self.conn.setex(f"jwm:user:{key}", self.timeout, value) def get(self, key): return self.conn.getex(f"jwm:user:{key}", self.timeout) class MysqlTimeoutSession: def __init__(self, username, passwd, timeout, database="jwebmaildb1", port=3306): import mysql.connector self.timeout = timeout self.conn = mysql.connector.connect( host="localhost", port=port, username=username, password=passwd, database=database, ) def set(self, key, value): timeout = datetime.now() + timedelta(seconds=self.timeout) with closing(self.conn.cursor()) as cur: cur.execute("DELETE FROM session WHERE user = %s", [key]) cur.execute( "INSERT INTO session VALUES (%s, %s, %s)", [key, value, timeout] ) self.conn.commit() def get(self, key): with closing(self.conn.cursor()) as cur: cur.execute("DELETE FROM session WHERE timeout < NOW()") cur.execute("SELECT password FROM session WHERE user = %s", [key]) row = cur.fetchone() if row is None: self.conn.commit() return None else: timeout = datetime.now() + timedelta(seconds=self.timeout) cur.execute( "UPDATE session SET timeout = %s WHERE user = %s", [timeout, key] ) self.conn.commit() return row[0] def select_timeout_session(): session_type = current_app.config["JWEBMAIL"]["READ_MAILS"]["SESSION_TYPE"] if session_type == "REDIS": return RedisTimeoutSession elif session_type == "MYSQL": return MysqlTimeoutSession else: raise ValueError(f"unknown session_type {session_type!r}") def build_qma(username, password): authenticator = current_app.config["JWEBMAIL"]["READ_MAILS"]["AUTHENTICATOR"] backend = current_app.config["JWEBMAIL"]["READ_MAILS"]["BACKEND"] mailbox = current_app.config["JWEBMAIL"]["READ_MAILS"]["MAILBOX"] mailbox_user = current_app.config["JWEBMAIL"]["READ_MAILS"]["MAILBOX_USER"] return QMailAuthuser( username, password, backend, mailbox, mailbox_user, authenticator ) def login(username, password): return build_qma(username, password).verify_user() def add_user(user: JWebmailUser): passwd = current_app.config["JWEBMAIL"]["READ_MAILS"]["SESSION_STORE_PASSWD"] r = select_timeout_session()("jwebmail", passwd, EXPIRATION_SEC) r.set(user.get_id(), user.password) def load_user(username: str) -> JWebmailUser: ss_password = current_app.config["JWEBMAIL"]["READ_MAILS"]["SESSION_STORE_PASSWD"] r = select_timeout_session()("jwebmail", ss_password, EXPIRATION_SEC) passwd = r.get(username) if passwd is None: return None return JWebmailUser(username, passwd) def get_read_mails_logged_in(): if "read_mails" in g: return g.read_mails qma = build_qma(current_user.get_id(), current_user.password) g.read_mails = qma return qma