import os from base64 import b64decode from socket import socketpair import varlink class QMAuthError(Exception): def __init__(self, rc): super().__init__(rc) self.rc = rc class QMailAuthuser: def __init__(self, prog, mailbox_path, virtual_user, authenticator): self._prog = prog self._mailbox_path = mailbox_path self._virtual_user = virtual_user self._authenticator = authenticator self._pid = None self._socket = None self._client = None self._connection = None def list_search(self, folder, bound, after, limit, sort, search): sort_val = dict() if sort[0] == "!": sort = sort[1:] sort_val["direction"] = "desc" else: sort_val["direction"] = "asc" match sort: case "date" | "sender" | "size": sort_val["parameter"] = sort case _: raise ValueError(f"invalid sort parameter {sort!r}") if bound is not None: param, mid = bound.split("_", 1) bound = {"param": param, "id": mid} req = self._connection.ListSearch( folder=folder, bound=bound, direction="after" if after else "before", limit=limit, sort=sort_val, search=search, ) return ( [ { "message_handle": lmh["mid"], "byte_size": lmh["byte_size"], "unread": lmh["unread"], "date_received": lmh["rec_date"], "head": self._mail_header(lmh["header"]), } for lmh in req["mail_heads"] ], req["first"], req["last"], ) def count(self, folder): resp = self._connection.Stats(folder=folder) return { "byte_size": resp["byte_size"], "total_mails": resp["mail_count"], "unread_mails": resp["unread_count"], } def show(self, folder, msgid): resp = self._connection.Show(folder=folder, mid=msgid) return { "head": self._mail_header(resp["mail"]["head"]), "body": self._mail_body(resp["mail"]["body"]), } def raw(self, folder, mid, path): resp = self._connection.Raw(folder=folder, mid=mid, path=path) return { "head": self._mime_header(resp["header"]), "body": b64decode(resp["body"]), } def folders(self): resp = self._connection.Folders() return list(resp["folders"]) + [""] def move(self, mid, from_f, to_f): self._connection.Move(mid=mid, from_folder=from_f, to_folder=to_f) return True def remove(self, folder, msgid): self._connection.Remove(folder=folder, mid=msgid) return True def add_folder(self, name): resp = self._connection.AddFolder(name=name) return resp["status"] @staticmethod def _address(addr): if not addr: return None a = {"address": addr["address"]} if addr.get("name"): a["display_name"] = addr["name"] return a def _mail_header(self, mail_head): h = mail_head return { "date": h["send_date"], "sender": self._address(h.get("sender")), "cc": [self._address(x) for x in h["cc"] if x], "bcc": [self._address(x) for x in h["bcc"] if x], "from": [self._address(x) for x in h["written_from"] if x], "reply_to": [self._address(x) for x in h["reply_to"] if x], "to": [self._address(x) for x in h["send_to"] if x], "subject": h["subject"], "comments": list(h["comments"]), "keywords": list(h["keywords"]), "mime": self._mime_header(h["mime"]), } @staticmethod def _mime_header(mime_head): mh = { "content_maintype": mime_head["mime_type"]["main_type"], "content_subtype": mime_head["mime_type"]["sub_type"], } match mime_head["content_dispo"]: case "inline": mh["content_disposition"] = "inline" case "attachment": mh["content_disposition"] = "attachment" case "none": mh["content_disposition"] = None case invalid: raise ValueError(f"invalid content disposition {invalid!r}") if "file_name" in mime_head: mh["filename"] = mime_head["file_name"] return mh def _mail_body(self, body): assert len(body) == 1 if "discrete" in body: return body["discrete"] elif "multipart" in body: res = {"parts": []} for mp in body["multipart"]["parts"]: r = {"head": self._mime_header(mp["mime_header"])} if mp["mime_header"]["content_dispo"] != "attachment": r["body"] = self._mail_body(mp["body"]) res["parts"].append(r) if pr := body["multipart"].get("preamble"): res["preamble"] = pr if ep := body["multipart"].get("epilogue"): res["epilogue"] = ep return res elif "mail" in body: return { "head": self._mail_header(body["mail"]["head"]), "body": self._mail_body(body["mail"]["body"]), } else: assert False def open(self, username, password): (rp, wp) = os.pipe() (sp, sc) = socketpair() cmdline = [self._authenticator, self._prog] if (pid := os.fork()) == 0: # child os.close(wp) sp.close() os.dup2(rp, 3) os.dup2(sc.fileno(), 4) os.environ["LISTEN_FDS"] = "2" os.environ["LISTEN_FDNAMES"] = "auth:varlink" os.environ["LISTEN_PID"] = str(os.getpid()) os.execvp(self._authenticator, cmdline) assert False sc.close() os.close(rp) os.write(wp, f"{username}\0{password}\0\0".encode()) os.close(wp) self._pid = pid self._socket = sp self._client = varlink.Client() try: self._connection = self._client.open( "de.jmhoffmann.jwebmail.mail-storage", connection=sp ) except ConnectionResetError: (pid, status) = os.waitpid(pid, os.WNOHANG) if pid != 0: raise QMAuthError(os.waitstatus_to_exitcode(status)) else: raise user = username[: username.index("@")] self._connection.Init( unix_user=self._virtual_user, mailbox_path=os.path.join(self._mailbox_path, user), ) return self def close(self): self._connection.close() self._socket.close() (pid, status) = os.waitpid(self._pid, 0) assert pid == self._pid rc = os.waitstatus_to_exitcode(status) if rc != 0: raise QMAuthError(rc) def __enter__(self): return self def __exit__(self, ex_type, ex_val, ex_tb): if ex_val is None: self.close() elif issubclass(ex_type, BrokenPipeError): (pid, _status) = os.waitpid(self._pid, 0) assert pid == self._pid return False