summaryrefslogtreecommitdiff
path: root/src/jwebmail/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/jwebmail/model')
-rw-r--r--src/jwebmail/model/read_mails.py128
1 files changed, 128 insertions, 0 deletions
diff --git a/src/jwebmail/model/read_mails.py b/src/jwebmail/model/read_mails.py
new file mode 100644
index 0000000..f82b601
--- /dev/null
+++ b/src/jwebmail/model/read_mails.py
@@ -0,0 +1,128 @@
+import json
+import shlex
+from subprocess import PIPE, Popen, TimeoutExpired
+from subprocess import run as subprocess_run
+
+
+class QMAuthError(Exception):
+ def __init__(self, msg, rc, response=None):
+ super().__init__(msg, rc, response)
+ self.msg = msg
+ self.rc = rc
+ self.response = response
+
+
+class QMailAuthuser:
+ def __init__(
+ self, username, password, prog, mailbox_path, virtual_user, authenticator
+ ):
+ self._username = username
+ self._password = password
+ self._prog = prog
+ self._mailbox_path = mailbox_path
+ self._virtual_user = virtual_user
+ self._authenticator = authenticator
+
+ def verify_user(self):
+ try:
+ completed_proc = subprocess_run(
+ f"{self._authenticator} true 3<&0",
+ input=f"{self._username}\0{self._password}\0\0".encode(),
+ shell=True,
+ timeout=2,
+ )
+ match completed_proc.returncode:
+ case 0:
+ return True
+ case 1:
+ return False
+ case n:
+ raise QMAuthError("authentication error", n)
+ except TimeoutExpired:
+ return False
+
+ def read_headers_for(self, folder, start, end, sort):
+ return self.build_and_run("list", [folder, start, end, sort])
+
+ def count(self, folder):
+ return self.build_and_run("count", [folder])
+
+ def show(self, folder, msgid):
+ return self.build_and_run("read", [folder, msgid])
+
+ def raw(self, folder, mid, path):
+ return self.build_and_run("raw", [folder, mid, path])
+
+ def search(self, pattern, folder):
+ return self.build_and_run("search", [pattern, folder])
+
+ def folders(self):
+ res = self.build_and_run("folders")
+ if isinstance(res, list):
+ return [""] + res
+ return res
+
+ def move(self, mid, from_f, to_f):
+ _resp = self.build_and_run("move", [mid, from_f, to_f])
+ return True
+
+ def remove(self, folder, msgid):
+ _resp = self.build_and_run("remove", [folder, msgid])
+ return True
+
+ def _build_arg(self, user_mail_addr, mode, args):
+ idx = user_mail_addr.find("@")
+ user_name = user_mail_addr[:idx]
+
+ return (
+ " ".join(
+ shlex.quote(str(x))
+ for x in [
+ self._authenticator,
+ self._prog,
+ self._mailbox_path,
+ self._virtual_user,
+ user_name,
+ mode,
+ *args,
+ ]
+ )
+ + " 3<&0"
+ )
+
+ def _read_qmauth(self, prog_and_args):
+ popen = Popen(prog_and_args, stdin=PIPE, stdout=PIPE, shell=True)
+
+ try:
+ inp, _ = popen.communicate(
+ f"{self._username}\0{self._password}\0\0".encode(), timeout=30
+ )
+ popen.wait(1)
+ except TimeoutExpired:
+ popen.terminate()
+
+ if popen.poll() is None:
+ popen.kill()
+ popen.poll()
+
+ rc = popen.returncode
+ if rc in [0, 3]:
+ try:
+ a, b, c = inp.partition(b"\n")
+ d = json.loads(a)
+ if b:
+ resp = dict(head=d, body=c.decode())
+ else:
+ resp = d
+ except json.JSONDecodeError:
+ raise QMAuthError("error decoding response", rc, inp)
+ if rc == 3:
+ raise QMAuthError("error reported by extractor", rc, resp)
+ else:
+ raise QMAuthError("got unsuccessful return code by qmail-authuser", rc, inp)
+
+ return resp
+
+ def build_and_run(self, mode, args=()):
+ prog_and_args = self._build_arg(self._username, mode, args)
+ return self._read_qmauth(prog_and_args)