import shlex import os from subprocess import PIPE, Popen, TimeoutExpired from subprocess import run as subprocess_run import jwebmail.model.jwebmail_pb2 as pb2 class QMAuthError(Exception): def __init__(self, msg, rc, response=None): super().__init__(msg, rc, response) self.msg = msg self.rc = rc self.response = response class QMailAuthuser: def __init__( self, username, password, prog, mailbox_path, virtual_user, authenticator ): self._username = username self._password = password self._prog = prog self._mailbox_path = mailbox_path self._virtual_user = virtual_user self._authenticator = authenticator def verify_user(self): try: completed_proc = subprocess_run( f"{self._authenticator} true 3<&0", input=f"{self._username}\0{self._password}\0\0".encode(), shell=True, timeout=2, check=False, ) if completed_proc.returncode == 0: return True elif completed_proc.returncode == 1: return False else: raise QMAuthError("authentication error", completed_proc.returncode) except TimeoutExpired: return False def read_headers_for(self, folder, start, end, sort): req = pb2.ListReq(folder=folder, start=start, end=end, sort=sort) resp = self.build_and_run("list", req.SerializeToString()) r = pb2.ListResp() r.ParseFromString(resp) return [ { "message_handle": lmh.mid, "byte_size": lmh.byte_size, "unread": lmh.unread, "date_received": lmh.rec_date, "head": self._mail_header(lmh.header), } for lmh in r.mail_heads ] def count(self, folder): req = pb2.StatsReq(folder=folder) resp = self.build_and_run("count", req.SerializeToString()) r = pb2.StatsResp() r.ParseFromString(resp) return { "byte_size": r.byte_size, "total_mails": r.mail_count, "unread_mails": r.unread_count, } # def show(self, folder, msgid): req = pb2.ShowReq(folder=folder, mid=msgid) resp = self.build_and_run("read", req.SerializeToString()) r = pb2.ShowResp() r.ParseFromString(resp) return { "head": self._mail_header(r.mail.head), "body": self._mail_body(r.mail.body), } def raw(self, folder, mid, path): req = pb2.RawReq(folder=folder, mid=mid, path=path) resp = self.build_and_run("raw", req.SerializeToString()) r = pb2.RawResp() r.ParseFromString(resp) return {"head": self._mime_header(r.header), "body": r.body} # def search(self, pattern, folder): req = pb2.SearchReq(folder=folder, pattern=pattern) resp = self.build_and_run("search", req.SerializeToString()) r = pb2.SearchResp() r.ParseFromString(resp) return r def folders(self): resp = self.build_and_run("folders", pb2.FoldersReq().SerializeToString()) result = pb2.FoldersResp() result.ParseFromString(resp) return list(result.folders) + [""] def move(self, mid, from_f, to_f): req = pb2.MoveReq(mid=mid, from_f=from_f, to_f=to_f) resp = self.build_and_run("move", req.SerializeToString()) r = pb2.MoveResp() r.ParseFromString(resp) return True def remove(self, folder, msgid): req = pb2.RemoveReq(folder=folder, mid=msgid) resp = self.build_and_run("remove", req.SerializeToString()) r = pb2.RemoveResp() r.ParseFromString(resp) return True @staticmethod def _address(addr): if not addr: return None a = {"address": addr.address} if addr.HasField("name"): a["display_name"] = addr.name return a @classmethod def _mail_header(cls, mail_head): h = mail_head return { "date": h.send_date, "sender": cls._address(h.sender), "cc": [cls._address(x) for x in h.cc if x], "bcc": [cls._address(x) for x in h.bcc if x], "from": [cls._address(x) for x in h.written_from if x], "reply_to": [cls._address(x) for x in h.reply_to if x], "to": [cls._address(x) for x in h.send_to if x], "subject": h.subject, "comments": h.comments, "keywords": h.keywords, "mime": cls._mime_header(h.mime), } @staticmethod def _mime_header(mime_head): mh = { "content_maintype": mime_head.maintype, "content_subtype": mime_head.subtype, } if ( mime_head.contentdispo == pb2.MIMEHeader.ContentDisposition.CONTENT_DISPOSITION_NONE ): mh["content_disposition"] = None elif ( mime_head.contentdispo == pb2.MIMEHeader.ContentDisposition.CONTENT_DISPOSITION_INLINE ): mh["content_disposition"] = "inline" elif ( mime_head.contentdispo == pb2.MIMEHeader.ContentDisposition.CONTENT_DISPOSITION_ATTACHMENT ): mh["content_disposition"] = "attachment" if mime_head.HasField("file_name"): mh["filename"] = mime_head.file_name return mh @classmethod def _mail_body(cls, body): if body.WhichOneof("Body") == "discrete": return body.discrete elif body.WhichOneof("Body") == "multipart": res = {"parts": []} for mp in body.multipart.parts: r = {"head": cls._mime_header(mp.mime_header)} if ( mp.mime_header.contentdispo != pb2.MIMEHeader.ContentDisposition.CONTENT_DISPOSITION_ATTACHMENT ): r["body"] = cls._mail_body(mp.body) res["parts"].append(r) if body.multipart.HasField("preamble"): res["preamble"] = body.multipart.preamble if body.multipart.HasField("epilogue"): res["epilogue"] = body.multipart.epilogue return res elif body.WhichOneof("Body") == "mail": return { "head": cls._mail_header(body.mail.head), "body": cls._mail_body(body.mail.body), } else: assert False def _build_arg(self, user_mail_addr, mode, rp): idx = user_mail_addr.find("@") user_name = user_mail_addr[:idx] cmdline = " ".join( shlex.quote(str(x)) for x in ( self._authenticator, self._prog, self._mailbox_path, self._virtual_user, user_name, mode, ) ) if rp != 3: cmdline += f" 3<&{rp}-" return cmdline def _read_qmauth(self, cmd, args, rp, wp): popen = Popen(cmd, stdin=PIPE, stdout=PIPE, pass_fds=[rp], shell=True, bufsize=0) os.close(rp) os.write(wp, f"{self._username}\0{self._password}\0\0".encode()) os.close(wp) r = popen.stdout.read(10) if popen.poll(): raise QMAuthError("qmail-authuser unexpectedly exited", popen.returncode, r) assert r == b"OPEN\n" popen.stdin.write(args) popen.stdin.close() inp = popen.stdout.readall() if popen.poll() is None: popen.kill() popen.poll() rc = popen.returncode if rc == 0: return inp elif rc == 3: raise QMAuthError("error reported by extractor", 3, inp) else: raise QMAuthError("got unsuccessful return code by qmail-authuser", rc, inp) def build_and_run(self, mode, args): (rp, wp) = os.pipe() cmd = self._build_arg(self._username, mode, rp) return self._read_qmauth(cmd, args, rp, wp)