import json import shlex from subprocess import PIPE, Popen, TimeoutExpired from subprocess import run as subprocess_run class QMAuthError(Exception): def __init__(self, msg, rc, response=None): super().__init__(msg, rc, response) self.msg = msg self.rc = rc self.response = response class QMailAuthuser: def __init__( self, username, password, prog, mailbox_path, virtual_user, authenticator ): self._username = username self._password = password self._prog = prog self._mailbox_path = mailbox_path self._virtual_user = virtual_user self._authenticator = authenticator def verify_user(self): try: completed_proc = subprocess_run( f"{self._authenticator} true 3<&0", input=f"{self._username}\0{self._password}\0\0".encode(), shell=True, timeout=2, ) if completed_proc.returncode == 0: return True elif completed_proc.returncode == 1: return False else: raise QMAuthError("authentication error", completed_proc.returncode) except TimeoutExpired: return False def read_headers_for(self, folder, start, end, sort): return self.build_and_run("list", [folder, start, end, sort]) def count(self, folder): return self.build_and_run("count", [folder]) def show(self, folder, msgid): return self.build_and_run("read", [folder, msgid]) def raw(self, folder, mid, path): return self.build_and_run("raw", [folder, mid, path]) def search(self, pattern, folder): return self.build_and_run("search", [pattern, folder]) def folders(self): res = self.build_and_run("folders") if isinstance(res, list): return [""] + res return res def move(self, mid, from_f, to_f): _resp = self.build_and_run("move", [mid, from_f, to_f]) return True def remove(self, folder, msgid): _resp = self.build_and_run("remove", [folder, msgid]) return True def _build_arg(self, user_mail_addr, mode, args): idx = user_mail_addr.find("@") user_name = user_mail_addr[:idx] return ( " ".join( shlex.quote(str(x)) for x in [ self._authenticator, self._prog, self._mailbox_path, self._virtual_user, user_name, mode, *args, ] ) + " 3<&0" ) def _read_qmauth(self, prog_and_args): popen = Popen(prog_and_args, stdin=PIPE, stdout=PIPE, shell=True) try: inp, _ = popen.communicate( f"{self._username}\0{self._password}\0\0".encode(), timeout=30 ) popen.wait(1) except TimeoutExpired: popen.terminate() if popen.poll() is None: popen.kill() popen.poll() rc = popen.returncode if rc in [0, 3]: try: a, b, c = inp.partition(b"\n") d = json.loads(a) if b: resp = dict(head=d, body=c) else: resp = d except json.JSONDecodeError: raise QMAuthError("error decoding response", rc, inp) if rc == 3: raise QMAuthError("error reported by extractor", rc, resp) else: raise QMAuthError("got unsuccessful return code by qmail-authuser", rc, inp) return resp def build_and_run(self, mode, args=()): prog_and_args = self._build_arg(self._username, mode, args) return self._read_qmauth(prog_and_args)