#!/usr/bin/env python3 """qmauth.py Extract delivers information about emails from a maildir. Runs with elevated privileges. This program is started by qmail-authuser with elevated privileges after a successful login. """ import email.parser import email.policy import logging import re import socket from base64 import b64encode from datetime import datetime from email.message import EmailMessage from itertools import islice from mailbox import Maildir, MaildirMessage, NoSuchMailboxError from os import environ, getpid, mkdir, path, setuid, stat from pathlib import Path from pwd import getpwnam from sys import exit as sysexit import varlink class MyMaildir(Maildir): def __init__(self, dirname, parent=None, *args, **kwargs): self.__path = Path(dirname) self.set_msgtype("MaildirMessage") super().__init__(dirname, *args, **kwargs) def set_msgtype(self, typ): if typ == "MaildirMessage": self._factory = None elif typ == "EmailMessage": self._factory = lambda mail_file: email.parser.BytesParser( policy=email.policy.default ).parse(mail_file) elif typ == "Maildir(EmailMessageHeader)": def factory(mail_file): """ Copy from implementation. """ name = Path(mail_file._file.name).name msg = MaildirMessage( email.parser.BytesHeaderParser(policy=email.policy.default).parse( mail_file ) ) if self.colon in name: msg.set_info(name.split(self.colon)[-1]) return msg self._factory = factory elif typ == "None": self._factory = lambda _file: MaildirMessage(None) else: raise ValueError(typ) def get_filename(self, mid): return self.__path / self._lookup(mid) def get_folder(self, folder): """ This is an internal copy of the implementation of Maildir. This is because Maildir was not designed with inheritance in mind and this method does hence not return an instance of itself but rather only and instance of Maildir. This override corrects this but can not anticipate additional state introduced in more derived classes. """ return type(self)( path.join(self._path, "." + folder), create=False, ) def _refresh(self): """ This override of internal method _refresh that strips out 'hidden' files. """ super()._refresh() rm = [r for r in self._toc if r.startswith(".")] for r in rm: del self._toc[r] service = varlink.Service( vendor="JWebmail", product="Mail Storage Interface", version="1", url="https://www.fehcom.de/cgit/jwebmail2", interface_dir=path.join(path.dirname(__file__), "../src/jwebmail/model/"), ) class InvalidUserError(varlink.VarlinkError): def __init__(self, unix_user): super().__init__( { "error": "de.jmhoffmann.jwebmail.mail-storage.InvalidUser", "parameters": { "unix_user": unix_user, }, } ) class NotInitializedError(varlink.VarlinkError): def __init__(self): super().__init__( {"error": "de.jmhoffmann.jwebmail.mail-storage.NotInitialized"} ) class InvalidMailboxError(varlink.VarlinkError): def __init__(self, path, not_a_mailbox, user_mismatch): super().__init__( { "error": "de.jmhoffmann.jwebmail.mail-storage.InvalidMailbox", "parameters": { "path": path, "not_a_mailbox": not_a_mailbox, "user_mismatch": user_mismatch, }, } ) class InvalidMIDError(varlink.VarlinkError): def __init__(self, folder, mid): super().__init__( { "error": "de.jmhoffmann.jwebmail.mail-storage.InvalidMID", "parameters": { "folder": folder, "mid": mid, }, } ) class InvalidPathInMailError(varlink.VarlinkError): def __init__(self, folder, mid, path): super().__init__( { "error": "de.jmhoffmann.jwebmail.mail-storage.InvalidPathInMail", "parameters": { "folder": folder, "mid": mid, "path": path, }, } ) @service.interface("de.jmhoffmann.jwebmail.mail-storage") class MailStorage: def __init__(self): self.maildir = None def Init(self, su, maildir): del environ["PATH"] try: uid = getpwnam(su).pw_uid if not uid: raise InvalidUserError(su) setuid(uid) if stat(maildir).st_uid != uid: raise InvalidMailboxError( maildir, not_a_mailbox=False, user_mismatch=True ) self.maildir = MyMaildir(maildir, create=False) except KeyError: raise InvalidUserError(su) except (FileNotFoundError, NoSuchMailboxError): raise InvalidMailboxError(maildir, not_a_mailbox=True, user_mismatch=False) def List(self, folder, start, end, sort): if self.maildir is None: raise NotInitializedError() assert 0 <= start <= end maildir = self.maildir if folder: maildir = maildir.get_folder(folder) maildir.set_msgtype("Maildir(EmailMessageHeader)") if start == end: return [] kfn, reverse = self._sort_mails(maildir, sort) msgs = list(maildir.items()) msgs.sort(key=kfn, reverse=reverse) msgs = msgs[start : min(len(msgs), end)] def _get_rcv_time(mid): return float(mid[: mid.index(".")]) items = [ { "mid": mid, "byte_size": path.getsize(maildir.get_filename(mid)), "unread": "S" not in msg.get_flags(), "rec_date": datetime.fromtimestamp(_get_rcv_time(mid)).isoformat(), "header": self._get_head_info(msg), } for mid, msg in msgs ] return {"mail_heads": items} def Stats(self, folder): if self.maildir is None: raise NotInitializedError() maildir = self.maildir if folder: maildir = maildir.get_folder(folder) maildir.set_msgtype("None") return dict( mail_count=len(maildir), unread_count=len([1 for m in maildir if "S" in m.get_flags()]), byte_size=sum( path.getsize(maildir.get_filename(mid)) for mid in maildir.keys() ), ) def Show(self, folder, mid): if self.maildir is None: raise NotInitializedError() maildir = self.maildir if folder: maildir = maildir.get_folder(folder) maildir.set_msgtype("EmailMessage") msg = maildir.get(mid, None) if not msg: raise InvalidMIDError(folder, mid) maildir.set_msgtype("MaildirMessage") maildir[mid].add_flag("S") return { "mail": dict( head=self._get_head_info(msg), body=self._get_body(msg), ) } def Raw(self, folder, mid, path): if self.maildir is None: raise NotInitializedError() maildir = self.maildir if folder: maildir = maildir.get_folder(folder) maildir.set_msgtype("EmailMessage") msg = maildir.get(mid, None) if not msg: raise InvalidMIDError(folder, mid) pth = [int(seg) for seg in path.split(".")] if path else [] h = dict(mime_type={"main_type": "message", "sub_type": "rfc822"}) b = msg for n in pth: mctype = h["mime_type"]["main_type"] if mctype == "multipart": try: res = next(islice(b, n, None)) except StopIteration: raise InvalidPathInMailError(folder, mid, pth) (h, b) = self._descent(res) elif mctype == "message": assert n == 0 (h, b) = self._descent(b) else: raise InvalidPathInMailError( f"can not descent into non multipart content type {mctype}" ) if hasattr(b, "__next__"): # can not stop at multipart section raise InvalidPathInMailError(folder=folder, mid=mid, path=pth) elif isinstance(b, str): b = b.encode() elif isinstance(b, EmailMessage): b = b.as_bytes() return dict(header=h, body=b64encode(b).decode("ASCII")) def Search(self, folder, pattern): if self.maildir is None: raise NotInitializedError() maildir = self.maildir if folder: maildir = maildir.get_folder(folder) maildir.set_msgtype("EmailMessage") res = [ dict(header=self._get_head_info(msg)) for msg in maildir.values() if self._matches(msg, pattern) ] return {"found": res} def Folders(self): if self.maildir is None: raise NotInitializedError() return {"folders": self.maildir.list_folders()} def Move(self, mid, from_folder, to_folder): if self.maildir is None: raise NotInitializedError() maildir = self.maildir if from_folder: maildir = maildir.get_folder(from_folder) fname = Path(maildir.get_filename(mid)) assert to_folder in self.maildir.list_folders() or to_folder == "" sep = -2 if not from_folder else -3 if to_folder: res = fname.parts[:sep] + ("." + to_folder,) + fname.parts[-2:] else: res = fname.parts[:sep] + fname.parts[-2:] fname.rename(Path(*res)) def Remove(self, folder, mid): if self.maildir is None: raise NotInitializedError() maildir = self.maildir if folder: maildir = maildir.get_folder(folder) maildir.set_msgtype("MaildirMessage") maildir[mid].add_flag("T") def AddFolder(self, name): if self.maildir is None: raise NotInitializedError() name = path.join( self.maildir._path, "." + name.translate(str.maketrans("/", ".")) ) if path.isdir(name): return {"status": "skipped"} mkdir(name) mkdir(path.join(name, "cur")) mkdir(path.join(name, "new")) mkdir(path.join(name, "tmp")) return {"status": "created"} @staticmethod def _adr(addrs): if addrs is None: return [] ret = [] for addr in addrs.addresses: struct = {"address": addr.addr_spec} if addr.display_name: struct["name"] = addr.display_name ret.append(struct) return ret @staticmethod def _get_mime_head_info(msg): mh = dict( mime_type={ "main_type": msg.get_content_maintype(), "sub_type": msg.get_content_subtype(), } ) if (cd := msg.get_content_disposition()) == "inline": mh["content_dispo"] = "inline" elif cd == "attachment": mh["content_dispo"] = "attachment" elif cd is None: mh["content_dispo"] = "none" else: assert False if fn := msg.get_filename(): mh["file_name"] = fn return mh def _get_head_info(self, msg): mh = dict( send_date=msg["date"].datetime.isoformat(), written_from=self._adr(msg["from"]), reply_to=self._adr(msg["reply-to"]), send_to=self._adr(msg["to"]), cc=self._adr(msg["cc"]), bcc=self._adr(msg["bcc"]), subject=msg["subject"], comments=msg["comments"], keywords=msg["keywords"], mime=self._get_mime_head_info(msg), ) if s := self._adr(msg["sender"]): mh.sender = s[0] return mh def _get_body(self, mail): if not mail.is_multipart(): if mail.get_content_maintype() == "text": return dict(discrete=mail.get_content()) else: ret = mail.get_content() if ret.isascii(): return dict(discrete=ret.decode(encoding="ASCII")) elif len(ret) <= 128 * 1024: return dict(discrete=b64encode(ret).decode(encoding="ASCII")) else: raise ValueError( "non attachment part too large (>512kB)", size=len(ret) ) if (mctype := mail.get_content_maintype()) == "message": msg = mail.get_content() return dict( mail=dict(head=self._get_head_info(msg), body=self._get_body(msg)) ) elif mctype == "multipart": ret = dict( preamble=mail.preamble, parts=[], epilogue=mail.epilogue, ) for part in mail.iter_parts(): head = self._get_mime_head_info(part) if head["content_dispo"] != "attachment": body = self._get_body(part) else: body = None ret["parts"].append(dict(mime_header=head, body=body)) return dict(multipart=ret) else: raise ValueError(f"unknown major content-type {mctype!r}") def _descent(self, xx): head = self._get_mime_head_info(xx) if (mctype := head["mime_type"]["main_type"]) == "message": body = xx.get_content() elif mctype == "multipart": body = xx.iter_parts() else: body = xx.get_content() return head, body @staticmethod def _matches(m, pattern): if m.is_multipart(): return any( 1 for part in m.body.parts if re.search(pattern, part.decoded()) or re.search(pattern, part.subject) ) return re.search(pattern, m.body.decoded()) or re.search(pattern, m.subject) @staticmethod def _sort_mails(f, sort): match sort["direction"]: case "asc": reverse = False case "desc": pass reverse = True case sort_direct: raise ValueError(f"unknown sort direction {sort_direct!r}") def _sort_by_sender(midmsg): _, msg = midmsg if len(addrs := msg["from"].addresses) == 1: return addrs[0].addr_spec else: return msg["sender"].address.addr_spec def by_rec_date(midmsg): return float(re.match(r"\d+\.\d+", midmsg[0], re.ASCII)[0]) match sort["parameter"]: case "date": keyfn = by_rec_date case "sender": keyfn = _sort_by_sender case "subject": # fmt: off def keyfn(midmsg): return midmsg[1]["subject"] # fmt: on case "size": # fmt: off def keyfn(midmsg): return path.getsize(f.get_filename(midmsg[0])) # fmt: on case sort_param: logging.warning("unknown sort-verb %r", sort_param) reverse = False keyfn = by_rec_date return keyfn, reverse class RequestHandler(varlink.RequestHandler): service = service def main(): try: logging.basicConfig( level="INFO", format="%(levelname)s:" + str(getpid()) + ":%(message)s", ) if environ["LISTEN_FDS"] == "1": sok = socket.fromfd(3, socket.AF_UNIX, socket.SOCK_STREAM) RequestHandler(sok, None, None) return types = environ["LISTEN_FDNAMES"].split(":") assert len(types) == int(environ["LISTEN_FDS"]) for i, typ in enumerate(types): if typ == "varlink": sok = socket.fromfd(3 + i, socket.AF_UNIX, socket.SOCK_STREAM) RequestHandler(sok, None, None) sok.close() except Exception: logging.exception("qmauth.py error") sysexit(4) if __name__ == "__main__": main()