diff options
Diffstat (limited to 'script')
-rwxr-xr-x | script/extract.py | 479 | ||||
-rwxr-xr-x | script/testauthenticator.py | 30 |
2 files changed, 509 insertions, 0 deletions
diff --git a/script/extract.py b/script/extract.py new file mode 100755 index 0000000..e771110 --- /dev/null +++ b/script/extract.py @@ -0,0 +1,479 @@ +#!/usr/bin/env python3 + +"""qmauth.py + +Extract delivers information about emails from a maildir. +Runs with elevated privileges. + +This program is started by qmail-authuser with elevated privileges after +a successful login. +Input directives are provided as command line arguments. +Output is delivered via STDOUT as json and log information via STDERR. + +Exit codes:: + + 1 reserved + 2 reserved + 3 operational error (error message in output) + 4 user error (no output) + 5 issue switching to user (no output) + 110 reserved + 111 reserved +""" + +import email.parser +import email.policy +import json +import logging +import re +from argparse import ArgumentParser +from base64 import b64encode +from datetime import datetime +from glob import glob +from itertools import islice +from mailbox import Maildir, MaildirMessage +from os import environ, getpid, path, setuid +from pathlib import Path +from pwd import getpwnam +from sys import exit as sysexit +from sys import stdout + + +class MyMaildir(Maildir): + def __init__(self, dirname, *args, **kwargs): + self.__path = dirname + super().__init__(dirname, *args, **kwargs) + + def get_filename(self, mid): + p_cur = glob(path.join(self.__path, "cur", mid + "*")) + p_new = glob(path.join(self.__path, "new", mid + "*")) + res = p_cur + p_new + if len(res) != 1: + raise LookupError( + f"could not uniquely identify file for mail-id {mid!r}", mid + ) + return res[0] + + def get_folder(self, folder): + # copy from internal implementation + return MyMaildir( + path.join(self._path, "." + folder), + factory=self._factory, + create=False, + ) + + def _refresh(self): + super()._refresh() + for r in list(k for k in self._toc if k.startswith(".")): + del self._toc[r] + + +class QMAuthError(Exception): + def __init__(self, msg, **args): + self.msg = msg + self.info = args + + +def _adr(addrs): + if addrs is None: + return None + return [ + {"address": addr.addr_spec, "display_name": addr.display_name} + for addr in addrs.addresses + ] + + +def _get_rcv_time(mid): + idx = mid.find(".") + assert idx >= 0 + return float(mid[:idx]) + + +def startup(maildir, su, user, mode): + del environ["PATH"] + + netfehcom_uid = getpwnam(su).pw_uid + if not netfehcom_uid: + logging.error("user must not be root") + sysexit(5) + try: + setuid(netfehcom_uid) + except OSError: + logging.exception("error setting uid") + sysexit(5) + + def create_messages(mail_file): + if mode == count_mails: + msg = MaildirMessage(None) + elif mode == list_mails: + msg = MaildirMessage( + email.parser.BytesHeaderParser(policy=email.policy.default).parse( + mail_file + ) + ) + else: + msg = email.parser.BytesParser(policy=email.policy.default).parse(mail_file) + + return msg + + return MyMaildir( + maildir / user, + create=False, + factory=create_messages, + ) + + +def _sort_by_sender(midmsg): + _, msg = midmsg + + if len(addrs := msg["from"].addresses) == 1: + return addrs[0].addr_spec + else: + return msg["sender"].address.addr_spec + + +def _sort_mails(f, sort): + reverse = False + if sort.startswith("!"): + reverse = True + sort = sort[1:] + + def by_rec_date(midmsg): + return float(re.match(r"\d+\.\d+", midmsg[0], re.ASCII)[0]) + + if sort == "date": + keyfn = by_rec_date + elif sort == "sender": + keyfn = _sort_by_sender + elif sort == "subject": + keyfn = lambda midmsg: midmsg[1]["subject"] + elif sort == "size": + keyfn = lambda midmsg: path.getsize(f.get_filename(midmsg[0])) + elif sort == "": + keyfn = by_rec_date + else: + logging.warning("unknown sort-verb %r", sort) + reverse = False + keyfn = by_rec_date + + return keyfn, reverse + + +def _get_mime_head_info(msg): + return { + "content_maintype": msg.get_content_maintype(), + "content_subtype": msg.get_content_subtype(), + "content_disposition": msg.get_content_disposition(), + "filename": msg.get_filename(), + } + + +def _get_head_info(msg): + return { + "date": msg["date"].datetime.isoformat(), + "from": _adr(msg["from"]), + "sender": _adr(msg["sender"]), + "reply_to": _adr(msg["reply-to"]), + "to": _adr(msg["to"]), + "cc": _adr(msg["cc"]), + "bcc": _adr(msg["bcc"]), + "subject": msg["subject"], + "comments": msg["comments"], + "keywords": msg["keywords"], + "mime": _get_mime_head_info(msg), + } + + +def list_mails(f, start, end, sortby, folder): + assert 0 <= start <= end + + if folder: + f = f.get_folder(folder) + + if start == end: + return [] + + kfn, reverse = _sort_mails(f, sortby) + msgs = list(f.items()) + msgs.sort(key=kfn, reverse=reverse) + msgs = msgs[start : min(len(msgs), end)] + + return [ + { + "message_handle": mid, + "byte_size": path.getsize(f.get_filename(mid)), + "unread": "S" in msg.get_flags(), + "date_received": datetime.fromtimestamp(_get_rcv_time(mid)).isoformat(), + "head": _get_head_info(msg), + } + for mid, msg in msgs + ] + + +def count_mails(f, subfolder): + if subfolder: + f = f.get_folder(subfolder) + + return { + "total_mails": len(f), + "byte_size": sum(path.getsize(f.get_filename(mid)) for mid in f.keys()), + "unread_mails": len([1 for m in f if "S" in m.get_flags()]), + } + + +def _get_body(mail): + if not mail.is_multipart(): + if mail.get_content_maintype() == "text": + return mail.get_content() + else: + ret = mail.get_content() + if ret.isascii(): + return ret.decode(encoding="ascii") + elif len(ret) <= 128 * 1024: + return b64encode(ret).decode(encoding="ascii") + else: + raise QMAuthError( + "non attachment part too large (>512kB)", size=len(ret) + ) + + if (mctype := mail.get_content_maintype()) == "message": + msg = mail.get_content() + return { + "head": _get_head_info(msg), + "body": _get_body(msg), + } + elif mctype == "multipart": + ret = { + "preamble": mail.preamble, + "parts": [], + "epilogue": mail.epilogue, + } + for part in mail.iter_parts(): + head = _get_mime_head_info(part) + if head["content_disposition"] != "attachment": + body = _get_body(part) + else: + body = None + ret["parts"].append( + { + "head": head, + "body": body, + } + ) + return ret + else: + raise ValueError(f"unknown major content-type {mctype!r}") + + +def read_mail(f, subfolder, mid): + if subfolder: + f = f.get_folder(subfolder) + + msg = f.get(mid, None) + if not msg: + raise QMAuthError("no such message", mid=mid) + + return { + "head": _get_head_info(msg), + "body": _get_body(msg), + } + + +def _descent(xx): + head = _get_mime_head_info(xx) + if (mctype := head["content_maintype"]) == "message": + body = xx.get_content() + elif mctype == "multipart": + body = xx.iter_parts() + else: + body = xx.get_content() + return { + "head": head, + "body": body, + } + + +def raw_mail(f, subfolder, mid, path): + if subfolder: + f = f.get_folder(subfolder) + + msg = f.get(mid, None) + if not msg: + raise QMAuthError("no such message", mid=mid) + + pth = [int(seg) for seg in path.split(".")] if path else [] + mail = { + "head": {"content_maintype": "message", "content_subtype": "rfc822"}, + "body": msg, + } + + for n in pth: + mctype = mail["head"]["content_maintype"] + + if mctype == "multipart": + try: + res = next(islice(mail["body"], n, None)) + except StopIteration: + raise QMAuthError("out of bounds path for mail", path=pth) + mail = _descent(res) + elif mctype == "message": + assert n == 0 + mail = _descent(mail["body"]) + else: + raise QMAuthError( + f"can not descent into non multipart content type {mctype}" + ) + + if hasattr(mail["body"], "__next__"): + raise QMAuthError("can not stop at multipart section", path=pth) + + json.dump(mail["head"], stdout) + stdout.write("\n") + if isinstance(mail["body"], str): + stdout.write(mail["body"]) + elif isinstance(mail["body"], bytes): + stdout.flush() + stdout.buffer.write(mail["body"]) + else: + stdout.write(str(mail["body"])) + sysexit(0) + + +def _matches(m, pattern): + if m.is_multipart(): + return any( + 1 + for part in m.body.parts + if re.search(pattern, part.decoded()) or re.search(pattern, part.subject) + ) + return re.search(pattern, m.body.decoded()) or re.search(pattern, m.subject) + + +def search_mails(f, pattern: str, subfolder: str): + if subfolder: + f = f.get_folder(subfolder) + + return [ + { + "head": _get_head_info(msg), + "body": _get_body(msg), + } + for msg in f.values() + if _matches(msg, pattern) + ] + + +def folders(f): + return f.list_folders() + + +def move_mail(f, mid, from_, to): + if from_: + f = f.get_folder(from_) + + fname = Path(f.get_filename(mid)) + + assert to in f.list_folders() + + sep = -2 if not from_ else -3 + + if to: + res = fname.parts[:sep] + ("." + to,) + fname.parts[-2:] + else: + res = fname.parts[:sep] + fname.parts[-2:] + + fname.rename(Path(*res)) + + return 1 + + +def remove_mail(f, subdir, mid): + if subdir: + f = f.get_folder(subdir) + + f[mid].add_flag("T") + + return 1 + + +def parse_arguments(): + ap = ArgumentParser(allow_abbrev=False) + ap.add_argument("maildir_path", type=Path) + ap.add_argument("os_user") + ap.add_argument("mail_user") + + sp = ap.add_subparsers(title="methods", required=True) + + sp_list = sp.add_parser("list") + sp_list.add_argument("folder", metavar="subfolder") + sp_list.add_argument("start", type=int) + sp_list.add_argument("end", type=int) + sp_list.add_argument("sortby", metavar="sort_by") + sp_list.set_defaults(run=list_mails) + + sp_count = sp.add_parser("count") + sp_count.add_argument("subfolder") + sp_count.set_defaults(run=count_mails) + + sp_read = sp.add_parser("read") + sp_read.add_argument("subfolder") + sp_read.add_argument("mid", metavar="message") + sp_read.set_defaults(run=read_mail) + + sp_raw = sp.add_parser("raw") + sp_raw.add_argument("subfolder") + sp_raw.add_argument("mid", metavar="message") + sp_raw.add_argument("path", default="") + sp_raw.set_defaults(run=raw_mail) + + sp_folders = sp.add_parser("folders") + sp_folders.set_defaults(run=folders) + + sp_move = sp.add_parser("move") + sp_move.add_argument("mid", metavar="message") + sp_move.add_argument("from_", metavar="from") + sp_move.add_argument("to") + sp_move.set_defaults(run=move_mail) + + sp_remove = sp.add_parser("remove") + sp_remove.add_argument("subdir") + sp_remove.add_argument("mid", metavar="message") + sp_remove.set_defaults(run=remove_mail) + + sp_search = sp.add_parser("search") + sp_search.add_argument("pattern") + sp_search.add_argument("subfolder") + sp_search.set_defaults(run=search_mails) + + return vars(ap.parse_args()) + + +def main(): + try: + logging.basicConfig( + level="INFO", + format="%(levelname)s:" + str(getpid()) + ":%(message)s", + ) + args = parse_arguments() + logging.debug("started with %s", args) + s = startup( + args.pop("maildir_path"), + args.pop("os_user"), + args.pop("mail_user"), + args["run"], + ) + logging.debug("setuid successful") + run = args.pop("run") + reply = run(s, **args) + json.dump(reply, stdout) + except QMAuthError as qerr: + errmsg = dict(error=qerr.msg, **qerr.info) + json.dump(errmsg, stdout) + sysexit(3) + except Exception: + logging.exception("qmauth.py error") + sysexit(4) + + +if __name__ == "__main__": + main() diff --git a/script/testauthenticator.py b/script/testauthenticator.py new file mode 100755 index 0000000..91767ad --- /dev/null +++ b/script/testauthenticator.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 + +import logging +import os +import sys + +VALID_USER = b"mockmaildir@example.org" +VALID_PW = b"12345" + + +def main(): + with os.fdopen(3, "rb") as authfd: + inp = authfd.read() + + u, p, *r = inp.split(b"\0") + if len(r) > 2: + logging.warning("too many fields!") + sys.exit(2) + + if r and r[0]: + raise ValueError("cram currently not supported") + else: + if u == VALID_USER and p == VALID_PW: + os.execvp(sys.argv[1], sys.argv[1:]) + + sys.exit(1) + + +if __name__ == "__main__": + main() |