1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
import json
import shlex
from subprocess import PIPE, Popen, TimeoutExpired
from subprocess import run as subprocess_run
class QMAuthError(Exception):
def __init__(self, msg, rc, response=None):
super().__init__(msg, rc, response)
self.msg = msg
self.rc = rc
self.response = response
class QMailAuthuser:
def __init__(
self, username, password, prog, mailbox_path, virtual_user, authenticator
):
self._username = username
self._password = password
self._prog = prog
self._mailbox_path = mailbox_path
self._virtual_user = virtual_user
self._authenticator = authenticator
def verify_user(self):
try:
completed_proc = subprocess_run(
f"{self._authenticator} true 3<&0",
shell=True,
timeout=2,
input=f"{self._username}\0{self._password}\0\0".encode(),
)
if completed_proc.returncode == 0:
return True
elif completed_proc.returncode == 1:
return False
else:
raise QMAuthError("authentication error", completed_proc.returncode)
except TimeoutExpired:
return False
def read_headers_for(self, folder, start, end, sort):
return self.build_and_run("list", [folder, start, end, sort])
def count(self, folder):
return self.build_and_run("count", [folder])
def show(self, folder, msgid):
return self.build_and_run("read", [folder, msgid])
def raw(self, folder, mid, path):
return self.build_and_run("raw", [folder, mid, path])
def search(self, pattern, folder):
return self.build_and_run("search", [pattern, folder])
def folders(self):
res = self.build_and_run("folders")
if isinstance(res, list):
return [""] + res
return res
def move(self, mid, from_f, to_f):
_resp = self.build_and_run("move", [mid, from_f, to_f])
return True
def remove(self, folder, msgid):
_resp = self.build_and_run("remove", [folder, msgid])
return True
def _build_arg(self, user_mail_addr, mode, args):
idx = user_mail_addr.find("@")
user_name = user_mail_addr[:idx]
return (
" ".join(
shlex.quote(str(x))
for x in [
self._authenticator,
self._prog,
self._mailbox_path,
self._virtual_user,
user_name,
mode,
*args,
]
)
+ " 3<&0"
)
def _read_qmauth(self, prog_and_args):
popen = Popen(prog_and_args, stdin=PIPE, stdout=PIPE, shell=True)
try:
inp, _ = popen.communicate(
f"{self._username}\0{self._password}\0\0".encode(), timeout=30
)
popen.wait(1)
except TimeoutExpired:
popen.terminate()
if popen.poll() is None:
popen.kill()
popen.poll()
rc = popen.returncode
if rc in [0, 3]:
try:
a, b, c = inp.partition(b"\n")
d = json.loads(a)
if b:
resp = dict(head=d, body=c.decode())
else:
resp = d
except json.JSONDecodeError:
raise QMAuthError("error decoding response", rc, inp)
if rc == 3:
raise QMAuthError("error reported by extractor", rc, resp)
else:
raise QMAuthError("got unsuccessful return code by qmail-authuser", rc, inp)
return resp
def build_and_run(self, mode, args=()):
prog_and_args = self._build_arg(self._username, mode, args)
return self._read_qmauth(prog_and_args)
|