summaryrefslogtreecommitdiff
path: root/src/jwebmail/read_mails.py
blob: 5aed8d26a11eb75cf4c9f28898e2ad0dd3adfe89 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import pwd
from contextlib import closing
from os.path import join as path_join

from flask import current_app, g
from flask_login import UserMixin, current_user, login_user

from .model.read_mails import QMailAuthuser, QMAuthError

EXPIRATION_SEC = 60 * 60 * 25


class JWebmailUser(UserMixin):
    def __init__(self, mail_addr, password):
        self.id = mail_addr
        self.password = password


class RedisTimeoutSession:
    def __init__(self, username, passwd, timeout, port=6379):
        import redis

        self.timeout = timeout
        self.conn = redis.Redis(
            host="localhost",
            port=port,
            decode_responses=True,
            protocol=3,
            username=username,
            password=passwd,
        )

    def set(self, key, value):
        self.conn.setex(f"jwm:user:{key}", self.timeout, value)

    def get(self, key):
        return self.conn.getex(f"jwm:user:{key}", self.timeout)

    def close(self):
        self.conn.close()


class MysqlTimeoutSession:
    def __init__(self, username, passwd, timeout, database="jwebmaildb1", port=3306):
        import mysql.connector

        self.timeout = timeout
        self.conn = mysql.connector.connect(
            host="localhost",
            port=port,
            username=username,
            password=passwd,
            database=database,
        )

    def set(self, key, value):
        with closing(self.conn.cursor()) as cur:
            cur.execute(
                "REPLACE INTO session VALUES (%s, %s, now() + INTERVAL %s SECOND)",
                [key, value, self.timeout],
            )
            self.conn.commit()

    def get(self, key):
        with closing(self.conn.cursor()) as cur:
            cur.execute("DELETE FROM session WHERE timeout < now()")
            cur.execute(
                "UPDATE session SET timeout = now() + INTERVAL %s SECOND WHERE user = %s",
                [self.timeout, key],
            )
            cur.execute("SELECT password FROM session WHERE user = %s", [key])
            row = cur.fetchone()
            self.conn.commit()

            return None if row is None else row[0]

    def close(self):
        self.conn.close()


class SqliteTimeoutSession:
    def __init__(self, _username, _passwd, timeout, database):
        import sqlite3

        self.timeout = timeout

        self.conn = sqlite3.connect(database, isolation_level="IMMEDIATE")
        cur = self.conn.cursor()
        cur.execute(
            """
            CREATE TABLE IF NOT EXISTS session (
                user text PRIMARY KEY,
                password text,
                timeout integer NOT NULL
            ) STRICT
            """
        )
        cur.execute("CREATE INDEX IF NOT EXISTS timeout_idx ON session (timeout)")

    def set(self, key, value):
        with closing(self.conn.cursor()) as cur:
            cur.execute(
                """
                INSERT OR REPLACE INTO session
                VALUES (?, ?, unixepoch('now', format('%d seconds', ?)))
                """,
                [key, value, self.timeout],
            )
            self.conn.commit()

    def get(self, key):
        with closing(self.conn.cursor()) as cur:
            cur.execute("DELETE FROM session WHERE timeout < unixepoch()")
            cur.execute(
                """
                UPDATE session
                SET timeout = unixepoch('now', format('%d seconds', ?))
                WHERE user = ?
                RETURNING password
                """,
                [self.timeout, key],
            )
            row = cur.fetchone()
            self.conn.commit()

            return None if row is None else row[0]

    def close(self):
        self.conn.close()


def _select_timeout_session():
    session_type = current_app.config["JWEBMAIL"]["READ_MAILS"]["SESSION_TYPE"]

    user = "jwebmail"
    passwd = current_app.config["JWEBMAIL"]["READ_MAILS"].get("SESSION_STORE_PASSWD")
    args = dict()
    db_name = current_app.config["JWEBMAIL"]["READ_MAILS"].get("SESSION_STORE_DB_NAME")
    if db_name:
        args["database"] = db_name

    if session_type == "REDIS":
        return RedisTimeoutSession(user, passwd, EXPIRATION_SEC)
    elif session_type == "MYSQL":
        return MysqlTimeoutSession(user, passwd, EXPIRATION_SEC, **args)
    elif session_type == "SQLITE":
        args.setdefault("database", "/var/local/lib/jwebmail/jwebmail.sqlite3")
        return SqliteTimeoutSession(user, passwd, EXPIRATION_SEC, **args)
    else:
        raise ValueError(f"unknown session_type {session_type!r}")


def _build_qma(username, password):
    authenticator = current_app.config["JWEBMAIL"]["READ_MAILS"]["AUTHENTICATOR"]
    backend = current_app.config["JWEBMAIL"]["READ_MAILS"]["BACKEND"]

    virt_users = current_app.config["JWEBMAIL"]["READ_MAILS"].get("VIRTUAL_USERS")
    if virt_users:
        _, domain = username.split("@")

        with open(virt_users, encoding="ASCII") as file:
            for virt_dom in file:
                dom, unix_user = virt_dom.rstrip().split(":")
                if dom == domain:
                    mailbox_user = unix_user
                    mailbox = path_join(pwd.getpwnam(unix_user).pw_dir, "users/")
                    break
            else:
                raise ValueError(f"unknown virtual domain {domain!r}")
    else:
        mailbox_user = current_app.config["JWEBMAIL"]["READ_MAILS"]["MAILBOX_USER"]
        mailbox = current_app.config["JWEBMAIL"]["READ_MAILS"]["MAILBOX"]

    return QMailAuthuser(
        username, password, backend, mailbox, mailbox_user, authenticator
    )


def login(username, password):
    try:
        _build_qma(username, password).open()
    except QMAuthError as err:
        if err.rc == 1:
            return False
        else:
            raise

    r = _select_timeout_session()
    r.set(username, password)
    r.close()
    user = JWebmailUser(username, password)
    login_user(user)
    return True


def load_user(username: str) -> JWebmailUser:
    r = _select_timeout_session()
    passwd = r.get(username)
    r.close()
    if passwd is None:
        return None
    return JWebmailUser(username, passwd)


def get_read_mails_logged_in():
    if "read_mails" in g:
        return g.read_mails

    qma = _build_qma(current_user.get_id(), current_user.password).open()
    g.read_mails = qma
    return qma