import pwd from contextlib import closing from os.path import join as path_join from flask import current_app from flask_login import UserMixin, current_user, login_user from .model.read_mails import QMailAuthuser, QMAuthError EXPIRATION_SEC = 60 * 60 * 25 class JWebmailUser(UserMixin): def __init__(self, mail_addr, password): self.id = mail_addr self.password = password class RedisTimeoutSession: def __init__(self, username, passwd, timeout, port=6379): import redis self.timeout = timeout self.conn = redis.Redis( host="localhost", port=port, decode_responses=True, protocol=3, username=username, password=passwd, ) def set(self, key, value): self.conn.setex(f"jwm:user:{key}", self.timeout, value) def get(self, key): return self.conn.getex(f"jwm:user:{key}", self.timeout) def close(self): self.conn.close() class MysqlTimeoutSession: def __init__(self, username, passwd, timeout, database="jwebmaildb1", port=3306): import mysql.connector self.timeout = timeout self.conn = mysql.connector.connect( host="localhost", port=port, username=username, password=passwd, database=database, ) def set(self, key, value): with closing(self.conn.cursor()) as cur: cur.execute( "REPLACE INTO session VALUES (%s, %s, now() + INTERVAL %s SECOND)", [key, value, self.timeout], ) self.conn.commit() def get(self, key): with closing(self.conn.cursor()) as cur: cur.execute("DELETE FROM session WHERE timeout < now()") cur.execute( "UPDATE session SET timeout = now() + INTERVAL %s SECOND WHERE user = %s", [self.timeout, key], ) cur.execute("SELECT password FROM session WHERE user = %s", [key]) row = cur.fetchone() self.conn.commit() return None if row is None else row[0] def close(self): self.conn.close() class SqliteTimeoutSession: def __init__(self, _username, _passwd, timeout, database): import sqlite3 self.timeout = timeout self.conn = sqlite3.connect(database, isolation_level="IMMEDIATE") cur = self.conn.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS session ( user text PRIMARY KEY, password text, timeout integer NOT NULL ) STRICT """ ) cur.execute("CREATE INDEX IF NOT EXISTS timeout_idx ON session (timeout)") def set(self, key, value): with closing(self.conn.cursor()) as cur: cur.execute( """ INSERT OR REPLACE INTO session VALUES (?, ?, unixepoch('now', format('%d seconds', ?))) """, [key, value, self.timeout], ) self.conn.commit() def get(self, key): with closing(self.conn.cursor()) as cur: cur.execute("DELETE FROM session WHERE timeout < unixepoch()") cur.execute( """ UPDATE session SET timeout = unixepoch('now', format('%d seconds', ?)) WHERE user = ? RETURNING password """, [self.timeout, key], ) row = cur.fetchone() self.conn.commit() return None if row is None else row[0] def close(self): self.conn.close() def _select_timeout_session(): session_type = current_app.config["JWEBMAIL"]["READ_MAILS"]["SESSION_TYPE"] user = "jwebmail" passwd = current_app.config["JWEBMAIL"]["READ_MAILS"].get("SESSION_STORE_PASSWD") args = dict() db_name = current_app.config["JWEBMAIL"]["READ_MAILS"].get("SESSION_STORE_DB_NAME") if db_name: args["database"] = db_name if session_type == "REDIS": return RedisTimeoutSession(user, passwd, EXPIRATION_SEC) elif session_type == "MYSQL": return MysqlTimeoutSession(user, passwd, EXPIRATION_SEC, **args) elif session_type == "SQLITE": args.setdefault("database", "/var/local/lib/jwebmail/jwebmail.sqlite3") return SqliteTimeoutSession(user, passwd, EXPIRATION_SEC, **args) else: raise ValueError(f"unknown session_type {session_type!r}") def _build_qma(domain): authenticator = current_app.config["JWEBMAIL"]["READ_MAILS"]["AUTHENTICATOR"] backend = current_app.config["JWEBMAIL"]["READ_MAILS"]["BACKEND"] virt_users = current_app.config["JWEBMAIL"]["READ_MAILS"].get("VIRTUAL_USERS") if virt_users: with open(virt_users, encoding="ASCII") as file: for virt_dom in file: dom, unix_user = virt_dom.rstrip().split(":") if dom == domain: mailbox_user = unix_user mailbox = path_join(pwd.getpwnam(unix_user).pw_dir, "users/") break else: raise ValueError(f"unknown virtual domain {domain!r}") else: mailbox_user = current_app.config["JWEBMAIL"]["READ_MAILS"]["MAILBOX_USER"] mailbox = current_app.config["JWEBMAIL"]["READ_MAILS"]["MAILBOX"] return QMailAuthuser(backend, mailbox, mailbox_user, authenticator) def login(username, password): try: _, domain = username.split("@") _build_qma(domain).open(username, password) except QMAuthError as err: if err.rc == 1: return False else: raise r = _select_timeout_session() r.set(username, password) r.close() user = JWebmailUser(username, password) login_user(user) return True def load_user(username: str) -> JWebmailUser: r = _select_timeout_session() passwd = r.get(username) r.close() if passwd is None: return None return JWebmailUser(username, passwd) def get_read_mails_logged_in(): username = current_user.get_id() _, domain = username.split("@") return _build_qma(domain).open(username, current_user.password)