From 2cf2a68bd1c25d8fe4f3126f40bd57982cc6b2a4 Mon Sep 17 00:00:00 2001 From: "Jannis M. Hoffmann" Date: Sun, 3 Dec 2023 19:22:12 +0100 Subject: initial commit --- src/jwebmail/model/read_mails.py | 128 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 src/jwebmail/model/read_mails.py (limited to 'src/jwebmail/model') diff --git a/src/jwebmail/model/read_mails.py b/src/jwebmail/model/read_mails.py new file mode 100644 index 0000000..f82b601 --- /dev/null +++ b/src/jwebmail/model/read_mails.py @@ -0,0 +1,128 @@ +import json +import shlex +from subprocess import PIPE, Popen, TimeoutExpired +from subprocess import run as subprocess_run + + +class QMAuthError(Exception): + def __init__(self, msg, rc, response=None): + super().__init__(msg, rc, response) + self.msg = msg + self.rc = rc + self.response = response + + +class QMailAuthuser: + def __init__( + self, username, password, prog, mailbox_path, virtual_user, authenticator + ): + self._username = username + self._password = password + self._prog = prog + self._mailbox_path = mailbox_path + self._virtual_user = virtual_user + self._authenticator = authenticator + + def verify_user(self): + try: + completed_proc = subprocess_run( + f"{self._authenticator} true 3<&0", + input=f"{self._username}\0{self._password}\0\0".encode(), + shell=True, + timeout=2, + ) + match completed_proc.returncode: + case 0: + return True + case 1: + return False + case n: + raise QMAuthError("authentication error", n) + except TimeoutExpired: + return False + + def read_headers_for(self, folder, start, end, sort): + return self.build_and_run("list", [folder, start, end, sort]) + + def count(self, folder): + return self.build_and_run("count", [folder]) + + def show(self, folder, msgid): + return self.build_and_run("read", [folder, msgid]) + + def raw(self, folder, mid, path): + return self.build_and_run("raw", [folder, mid, path]) + + def search(self, pattern, folder): + return self.build_and_run("search", [pattern, folder]) + + def folders(self): + res = self.build_and_run("folders") + if isinstance(res, list): + return [""] + res + return res + + def move(self, mid, from_f, to_f): + _resp = self.build_and_run("move", [mid, from_f, to_f]) + return True + + def remove(self, folder, msgid): + _resp = self.build_and_run("remove", [folder, msgid]) + return True + + def _build_arg(self, user_mail_addr, mode, args): + idx = user_mail_addr.find("@") + user_name = user_mail_addr[:idx] + + return ( + " ".join( + shlex.quote(str(x)) + for x in [ + self._authenticator, + self._prog, + self._mailbox_path, + self._virtual_user, + user_name, + mode, + *args, + ] + ) + + " 3<&0" + ) + + def _read_qmauth(self, prog_and_args): + popen = Popen(prog_and_args, stdin=PIPE, stdout=PIPE, shell=True) + + try: + inp, _ = popen.communicate( + f"{self._username}\0{self._password}\0\0".encode(), timeout=30 + ) + popen.wait(1) + except TimeoutExpired: + popen.terminate() + + if popen.poll() is None: + popen.kill() + popen.poll() + + rc = popen.returncode + if rc in [0, 3]: + try: + a, b, c = inp.partition(b"\n") + d = json.loads(a) + if b: + resp = dict(head=d, body=c.decode()) + else: + resp = d + except json.JSONDecodeError: + raise QMAuthError("error decoding response", rc, inp) + if rc == 3: + raise QMAuthError("error reported by extractor", rc, resp) + else: + raise QMAuthError("got unsuccessful return code by qmail-authuser", rc, inp) + + return resp + + def build_and_run(self, mode, args=()): + prog_and_args = self._build_arg(self._username, mode, args) + return self._read_qmauth(prog_and_args) -- cgit v1.2.3