import pwd from contextlib import closing from os.path import join as path_join from flask import current_app, g from flask_login import UserMixin, current_user, login_user from .model.read_mails import QMailAuthuser EXPIRATION_SEC = 60 * 60 * 25 class JWebmailUser(UserMixin): def __init__(self, mail_addr, password): self.id = mail_addr self.password = password class RedisTimeoutSession: def __init__(self, username, passwd, timeout, port=6379): import redis self.timeout = timeout self.conn = redis.Redis( host="localhost", port=port, decode_responses=True, protocol=3, username=username, password=passwd, ) def set(self, key, value): self.conn.setex(f"jwm:user:{key}", self.timeout, value) def get(self, key): return self.conn.getex(f"jwm:user:{key}", self.timeout) def close(self): self.conn.close() class MysqlTimeoutSession: def __init__(self, username, passwd, timeout, database="jwebmaildb1", port=3306): import mysql.connector self.timeout = timeout self.conn = mysql.connector.connect( host="localhost", port=port, username=username, password=passwd, database=database, ) def set(self, key, value): with closing(self.conn.cursor()) as cur: cur.execute( "REPLACE INTO session VALUES (%s, %s, now() + INTERVAL %s SECOND)", [key, value, self.timeout], ) self.conn.commit() def get(self, key): with closing(self.conn.cursor()) as cur: cur.execute("DELETE FROM session WHERE timeout < now()") cur.execute( "UPDATE session SET timeout = now() + INTERVAL %s SECOND WHERE user = %s", [self.timeout, key], ) cur.execute("SELECT password FROM session WHERE user = %s", [key]) row = cur.fetchone() self.conn.commit() return None if row is None else row[0] def close(self): self.conn.close() class SqliteTimeoutSession: def __init__(self, _username, _passwd, timeout, database): import sqlite3 self.timeout = timeout self.conn = sqlite3.connect(database, isolation_level="IMMEDIATE") cur = self.conn.cursor() cur.execute( "CREATE TABLE IF NOT EXISTS session (user text PRIMARY KEY, password text, timeout integer NOT NULL) STRICT" ) cur.execute("CREATE INDEX IF NOT EXISTS timeout_idx ON session (timeout)") def set(self, key, value): with closing(self.conn.cursor()) as cur: cur.execute( "INSERT OR REPLACE INTO session VALUES (?, ?, unixepoch('now', format('%d seconds', ?)))", [key, value, self.timeout], ) self.conn.commit() def get(self, key): with closing(self.conn.cursor()) as cur: cur.execute("DELETE FROM session WHERE timeout < unixepoch()") cur.execute( "UPDATE session SET timeout = unixepoch('now', format('%d seconds', ?)) WHERE user = ? RETURNING password", [self.timeout, key], ) row = cur.fetchone() self.conn.commit() return None if row is None else row[0] def close(self): self.conn.close() def _select_timeout_session(): session_type = current_app.config["JWEBMAIL"]["READ_MAILS"]["SESSION_TYPE"] user = "jwebmail" passwd = current_app.config["JWEBMAIL"]["READ_MAILS"].get("SESSION_STORE_PASSWD") args = dict() db_name = current_app.config["JWEBMAIL"]["READ_MAILS"].get("SESSION_STORE_DB_NAME") if db_name: args["database"] = db_name if session_type == "REDIS": return RedisTimeoutSession(user, passwd, EXPIRATION_SEC) elif session_type == "MYSQL": return MysqlTimeoutSession(user, passwd, EXPIRATION_SEC, **args) elif session_type == "SQLITE": args.setdefault("database", "/var/local/lib/jwebmail/jwebmail.sqlite3") return SqliteTimeoutSession(user, passwd, EXPIRATION_SEC, **args) else: raise ValueError(f"unknown session_type {session_type!r}") def _build_qma(username, password): authenticator = current_app.config["JWEBMAIL"]["READ_MAILS"]["AUTHENTICATOR"] backend = current_app.config["JWEBMAIL"]["READ_MAILS"]["BACKEND"] virt_users = current_app.config["JWEBMAIL"]["READ_MAILS"].get("VIRTUAL_USERS") if virt_users: _, domain = username.split("@") with open(virt_users, encoding="ASCII") as file: for virt_dom in file: dom, unix_user = virt_dom.rstrip().split(":") if dom == domain: mailbox_user = unix_user mailbox = path_join(pwd.getpwnam(unix_user).pw_dir, "users/") break else: raise ValueError(f"unknown virtual domain {domain!r}") else: mailbox_user = current_app.config["JWEBMAIL"]["READ_MAILS"]["MAILBOX_USER"] mailbox = current_app.config["JWEBMAIL"]["READ_MAILS"]["MAILBOX"] return QMailAuthuser( username, password, backend, mailbox, mailbox_user, authenticator ) def login(username, password): if not _build_qma(username, password).verify_user(): return False r = _select_timeout_session() r.set(username, password) r.close() user = JWebmailUser(username, password) login_user(user) return True def load_user(username: str) -> JWebmailUser: r = _select_timeout_session() passwd = r.get(username) r.close() if passwd is None: return None return JWebmailUser(username, passwd) def get_read_mails_logged_in(): if "read_mails" in g: return g.read_mails qma = _build_qma(current_user.get_id(), current_user.password) g.read_mails = qma return qma