summaryrefslogtreecommitdiff
path: root/src/jwebmail/model/read_mails.py
blob: b322a16f55db73109e7caac4c9d42e8bd9becacb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import json
import shlex
from subprocess import PIPE, Popen, TimeoutExpired
from subprocess import run as subprocess_run


class QMAuthError(Exception):
    def __init__(self, msg, rc, response=None):
        super().__init__(msg, rc, response)
        self.msg = msg
        self.rc = rc
        self.response = response


class QMailAuthuser:
    def __init__(
        self, username, password, prog, mailbox_path, virtual_user, authenticator
    ):
        self._username = username
        self._password = password
        self._prog = prog
        self._mailbox_path = mailbox_path
        self._virtual_user = virtual_user
        self._authenticator = authenticator

    def verify_user(self):
        try:
            completed_proc = subprocess_run(
                f"{self._authenticator} true 3<&0",
                input=f"{self._username}\0{self._password}\0\0".encode(),
                shell=True,
                timeout=2,
            )
            if completed_proc.returncode == 0:
                return True
            elif completed_proc.returncode == 1:
                return False
            else:
                raise QMAuthError("authentication error", completed_proc.returncode)
        except TimeoutExpired:
            return False

    def read_headers_for(self, folder, start, end, sort):
        return self.build_and_run("list", [folder, start, end, sort])

    def count(self, folder):
        return self.build_and_run("count", [folder])

    def show(self, folder, msgid):
        return self.build_and_run("read", [folder, msgid])

    def raw(self, folder, mid, path):
        return self.build_and_run("raw", [folder, mid, path])

    def search(self, pattern, folder):
        return self.build_and_run("search", [pattern, folder])

    def folders(self):
        res = self.build_and_run("folders")
        if isinstance(res, list):
            return [""] + res
        return res

    def move(self, mid, from_f, to_f):
        _resp = self.build_and_run("move", [mid, from_f, to_f])
        return True

    def remove(self, folder, msgid):
        _resp = self.build_and_run("remove", [folder, msgid])
        return True

    def _build_arg(self, user_mail_addr, mode, args):
        idx = user_mail_addr.find("@")
        user_name = user_mail_addr[:idx]

        return (
            " ".join(
                shlex.quote(str(x))
                for x in [
                    self._authenticator,
                    self._prog,
                    self._mailbox_path,
                    self._virtual_user,
                    user_name,
                    mode,
                    *args,
                ]
            )
            + " 3<&0"
        )

    def _read_qmauth(self, prog_and_args):
        popen = Popen(prog_and_args, stdin=PIPE, stdout=PIPE, shell=True)

        try:
            inp, _ = popen.communicate(
                f"{self._username}\0{self._password}\0\0".encode(), timeout=30
            )
            popen.wait(1)
        except TimeoutExpired:
            popen.terminate()

        if popen.poll() is None:
            popen.kill()
            popen.poll()

        rc = popen.returncode
        if rc in [0, 3]:
            try:
                a, b, c = inp.partition(b"\n")
                d = json.loads(a)
                if b:
                    resp = dict(head=d, body=c.decode())
                else:
                    resp = d
            except json.JSONDecodeError:
                raise QMAuthError("error decoding response", rc, inp)
            if rc == 3:
                raise QMAuthError("error reported by extractor", rc, resp)
        else:
            raise QMAuthError("got unsuccessful return code by qmail-authuser", rc, inp)

        return resp

    def build_and_run(self, mode, args=()):
        prog_and_args = self._build_arg(self._username, mode, args)
        return self._read_qmauth(prog_and_args)