summaryrefslogtreecommitdiff
path: root/script
diff options
context:
space:
mode:
Diffstat (limited to 'script')
-rwxr-xr-xscript/extract.py479
-rwxr-xr-xscript/testauthenticator.py30
2 files changed, 509 insertions, 0 deletions
diff --git a/script/extract.py b/script/extract.py
new file mode 100755
index 0000000..e771110
--- /dev/null
+++ b/script/extract.py
@@ -0,0 +1,479 @@
+#!/usr/bin/env python3
+
+"""qmauth.py
+
+Extract delivers information about emails from a maildir.
+Runs with elevated privileges.
+
+This program is started by qmail-authuser with elevated privileges after
+a successful login.
+Input directives are provided as command line arguments.
+Output is delivered via STDOUT as json and log information via STDERR.
+
+Exit codes::
+
+ 1 reserved
+ 2 reserved
+ 3 operational error (error message in output)
+ 4 user error (no output)
+ 5 issue switching to user (no output)
+ 110 reserved
+ 111 reserved
+"""
+
+import email.parser
+import email.policy
+import json
+import logging
+import re
+from argparse import ArgumentParser
+from base64 import b64encode
+from datetime import datetime
+from glob import glob
+from itertools import islice
+from mailbox import Maildir, MaildirMessage
+from os import environ, getpid, path, setuid
+from pathlib import Path
+from pwd import getpwnam
+from sys import exit as sysexit
+from sys import stdout
+
+
+class MyMaildir(Maildir):
+ def __init__(self, dirname, *args, **kwargs):
+ self.__path = dirname
+ super().__init__(dirname, *args, **kwargs)
+
+ def get_filename(self, mid):
+ p_cur = glob(path.join(self.__path, "cur", mid + "*"))
+ p_new = glob(path.join(self.__path, "new", mid + "*"))
+ res = p_cur + p_new
+ if len(res) != 1:
+ raise LookupError(
+ f"could not uniquely identify file for mail-id {mid!r}", mid
+ )
+ return res[0]
+
+ def get_folder(self, folder):
+ # copy from internal implementation
+ return MyMaildir(
+ path.join(self._path, "." + folder),
+ factory=self._factory,
+ create=False,
+ )
+
+ def _refresh(self):
+ super()._refresh()
+ for r in list(k for k in self._toc if k.startswith(".")):
+ del self._toc[r]
+
+
+class QMAuthError(Exception):
+ def __init__(self, msg, **args):
+ self.msg = msg
+ self.info = args
+
+
+def _adr(addrs):
+ if addrs is None:
+ return None
+ return [
+ {"address": addr.addr_spec, "display_name": addr.display_name}
+ for addr in addrs.addresses
+ ]
+
+
+def _get_rcv_time(mid):
+ idx = mid.find(".")
+ assert idx >= 0
+ return float(mid[:idx])
+
+
+def startup(maildir, su, user, mode):
+ del environ["PATH"]
+
+ netfehcom_uid = getpwnam(su).pw_uid
+ if not netfehcom_uid:
+ logging.error("user must not be root")
+ sysexit(5)
+ try:
+ setuid(netfehcom_uid)
+ except OSError:
+ logging.exception("error setting uid")
+ sysexit(5)
+
+ def create_messages(mail_file):
+ if mode == count_mails:
+ msg = MaildirMessage(None)
+ elif mode == list_mails:
+ msg = MaildirMessage(
+ email.parser.BytesHeaderParser(policy=email.policy.default).parse(
+ mail_file
+ )
+ )
+ else:
+ msg = email.parser.BytesParser(policy=email.policy.default).parse(mail_file)
+
+ return msg
+
+ return MyMaildir(
+ maildir / user,
+ create=False,
+ factory=create_messages,
+ )
+
+
+def _sort_by_sender(midmsg):
+ _, msg = midmsg
+
+ if len(addrs := msg["from"].addresses) == 1:
+ return addrs[0].addr_spec
+ else:
+ return msg["sender"].address.addr_spec
+
+
+def _sort_mails(f, sort):
+ reverse = False
+ if sort.startswith("!"):
+ reverse = True
+ sort = sort[1:]
+
+ def by_rec_date(midmsg):
+ return float(re.match(r"\d+\.\d+", midmsg[0], re.ASCII)[0])
+
+ if sort == "date":
+ keyfn = by_rec_date
+ elif sort == "sender":
+ keyfn = _sort_by_sender
+ elif sort == "subject":
+ keyfn = lambda midmsg: midmsg[1]["subject"]
+ elif sort == "size":
+ keyfn = lambda midmsg: path.getsize(f.get_filename(midmsg[0]))
+ elif sort == "":
+ keyfn = by_rec_date
+ else:
+ logging.warning("unknown sort-verb %r", sort)
+ reverse = False
+ keyfn = by_rec_date
+
+ return keyfn, reverse
+
+
+def _get_mime_head_info(msg):
+ return {
+ "content_maintype": msg.get_content_maintype(),
+ "content_subtype": msg.get_content_subtype(),
+ "content_disposition": msg.get_content_disposition(),
+ "filename": msg.get_filename(),
+ }
+
+
+def _get_head_info(msg):
+ return {
+ "date": msg["date"].datetime.isoformat(),
+ "from": _adr(msg["from"]),
+ "sender": _adr(msg["sender"]),
+ "reply_to": _adr(msg["reply-to"]),
+ "to": _adr(msg["to"]),
+ "cc": _adr(msg["cc"]),
+ "bcc": _adr(msg["bcc"]),
+ "subject": msg["subject"],
+ "comments": msg["comments"],
+ "keywords": msg["keywords"],
+ "mime": _get_mime_head_info(msg),
+ }
+
+
+def list_mails(f, start, end, sortby, folder):
+ assert 0 <= start <= end
+
+ if folder:
+ f = f.get_folder(folder)
+
+ if start == end:
+ return []
+
+ kfn, reverse = _sort_mails(f, sortby)
+ msgs = list(f.items())
+ msgs.sort(key=kfn, reverse=reverse)
+ msgs = msgs[start : min(len(msgs), end)]
+
+ return [
+ {
+ "message_handle": mid,
+ "byte_size": path.getsize(f.get_filename(mid)),
+ "unread": "S" in msg.get_flags(),
+ "date_received": datetime.fromtimestamp(_get_rcv_time(mid)).isoformat(),
+ "head": _get_head_info(msg),
+ }
+ for mid, msg in msgs
+ ]
+
+
+def count_mails(f, subfolder):
+ if subfolder:
+ f = f.get_folder(subfolder)
+
+ return {
+ "total_mails": len(f),
+ "byte_size": sum(path.getsize(f.get_filename(mid)) for mid in f.keys()),
+ "unread_mails": len([1 for m in f if "S" in m.get_flags()]),
+ }
+
+
+def _get_body(mail):
+ if not mail.is_multipart():
+ if mail.get_content_maintype() == "text":
+ return mail.get_content()
+ else:
+ ret = mail.get_content()
+ if ret.isascii():
+ return ret.decode(encoding="ascii")
+ elif len(ret) <= 128 * 1024:
+ return b64encode(ret).decode(encoding="ascii")
+ else:
+ raise QMAuthError(
+ "non attachment part too large (>512kB)", size=len(ret)
+ )
+
+ if (mctype := mail.get_content_maintype()) == "message":
+ msg = mail.get_content()
+ return {
+ "head": _get_head_info(msg),
+ "body": _get_body(msg),
+ }
+ elif mctype == "multipart":
+ ret = {
+ "preamble": mail.preamble,
+ "parts": [],
+ "epilogue": mail.epilogue,
+ }
+ for part in mail.iter_parts():
+ head = _get_mime_head_info(part)
+ if head["content_disposition"] != "attachment":
+ body = _get_body(part)
+ else:
+ body = None
+ ret["parts"].append(
+ {
+ "head": head,
+ "body": body,
+ }
+ )
+ return ret
+ else:
+ raise ValueError(f"unknown major content-type {mctype!r}")
+
+
+def read_mail(f, subfolder, mid):
+ if subfolder:
+ f = f.get_folder(subfolder)
+
+ msg = f.get(mid, None)
+ if not msg:
+ raise QMAuthError("no such message", mid=mid)
+
+ return {
+ "head": _get_head_info(msg),
+ "body": _get_body(msg),
+ }
+
+
+def _descent(xx):
+ head = _get_mime_head_info(xx)
+ if (mctype := head["content_maintype"]) == "message":
+ body = xx.get_content()
+ elif mctype == "multipart":
+ body = xx.iter_parts()
+ else:
+ body = xx.get_content()
+ return {
+ "head": head,
+ "body": body,
+ }
+
+
+def raw_mail(f, subfolder, mid, path):
+ if subfolder:
+ f = f.get_folder(subfolder)
+
+ msg = f.get(mid, None)
+ if not msg:
+ raise QMAuthError("no such message", mid=mid)
+
+ pth = [int(seg) for seg in path.split(".")] if path else []
+ mail = {
+ "head": {"content_maintype": "message", "content_subtype": "rfc822"},
+ "body": msg,
+ }
+
+ for n in pth:
+ mctype = mail["head"]["content_maintype"]
+
+ if mctype == "multipart":
+ try:
+ res = next(islice(mail["body"], n, None))
+ except StopIteration:
+ raise QMAuthError("out of bounds path for mail", path=pth)
+ mail = _descent(res)
+ elif mctype == "message":
+ assert n == 0
+ mail = _descent(mail["body"])
+ else:
+ raise QMAuthError(
+ f"can not descent into non multipart content type {mctype}"
+ )
+
+ if hasattr(mail["body"], "__next__"):
+ raise QMAuthError("can not stop at multipart section", path=pth)
+
+ json.dump(mail["head"], stdout)
+ stdout.write("\n")
+ if isinstance(mail["body"], str):
+ stdout.write(mail["body"])
+ elif isinstance(mail["body"], bytes):
+ stdout.flush()
+ stdout.buffer.write(mail["body"])
+ else:
+ stdout.write(str(mail["body"]))
+ sysexit(0)
+
+
+def _matches(m, pattern):
+ if m.is_multipart():
+ return any(
+ 1
+ for part in m.body.parts
+ if re.search(pattern, part.decoded()) or re.search(pattern, part.subject)
+ )
+ return re.search(pattern, m.body.decoded()) or re.search(pattern, m.subject)
+
+
+def search_mails(f, pattern: str, subfolder: str):
+ if subfolder:
+ f = f.get_folder(subfolder)
+
+ return [
+ {
+ "head": _get_head_info(msg),
+ "body": _get_body(msg),
+ }
+ for msg in f.values()
+ if _matches(msg, pattern)
+ ]
+
+
+def folders(f):
+ return f.list_folders()
+
+
+def move_mail(f, mid, from_, to):
+ if from_:
+ f = f.get_folder(from_)
+
+ fname = Path(f.get_filename(mid))
+
+ assert to in f.list_folders()
+
+ sep = -2 if not from_ else -3
+
+ if to:
+ res = fname.parts[:sep] + ("." + to,) + fname.parts[-2:]
+ else:
+ res = fname.parts[:sep] + fname.parts[-2:]
+
+ fname.rename(Path(*res))
+
+ return 1
+
+
+def remove_mail(f, subdir, mid):
+ if subdir:
+ f = f.get_folder(subdir)
+
+ f[mid].add_flag("T")
+
+ return 1
+
+
+def parse_arguments():
+ ap = ArgumentParser(allow_abbrev=False)
+ ap.add_argument("maildir_path", type=Path)
+ ap.add_argument("os_user")
+ ap.add_argument("mail_user")
+
+ sp = ap.add_subparsers(title="methods", required=True)
+
+ sp_list = sp.add_parser("list")
+ sp_list.add_argument("folder", metavar="subfolder")
+ sp_list.add_argument("start", type=int)
+ sp_list.add_argument("end", type=int)
+ sp_list.add_argument("sortby", metavar="sort_by")
+ sp_list.set_defaults(run=list_mails)
+
+ sp_count = sp.add_parser("count")
+ sp_count.add_argument("subfolder")
+ sp_count.set_defaults(run=count_mails)
+
+ sp_read = sp.add_parser("read")
+ sp_read.add_argument("subfolder")
+ sp_read.add_argument("mid", metavar="message")
+ sp_read.set_defaults(run=read_mail)
+
+ sp_raw = sp.add_parser("raw")
+ sp_raw.add_argument("subfolder")
+ sp_raw.add_argument("mid", metavar="message")
+ sp_raw.add_argument("path", default="")
+ sp_raw.set_defaults(run=raw_mail)
+
+ sp_folders = sp.add_parser("folders")
+ sp_folders.set_defaults(run=folders)
+
+ sp_move = sp.add_parser("move")
+ sp_move.add_argument("mid", metavar="message")
+ sp_move.add_argument("from_", metavar="from")
+ sp_move.add_argument("to")
+ sp_move.set_defaults(run=move_mail)
+
+ sp_remove = sp.add_parser("remove")
+ sp_remove.add_argument("subdir")
+ sp_remove.add_argument("mid", metavar="message")
+ sp_remove.set_defaults(run=remove_mail)
+
+ sp_search = sp.add_parser("search")
+ sp_search.add_argument("pattern")
+ sp_search.add_argument("subfolder")
+ sp_search.set_defaults(run=search_mails)
+
+ return vars(ap.parse_args())
+
+
+def main():
+ try:
+ logging.basicConfig(
+ level="INFO",
+ format="%(levelname)s:" + str(getpid()) + ":%(message)s",
+ )
+ args = parse_arguments()
+ logging.debug("started with %s", args)
+ s = startup(
+ args.pop("maildir_path"),
+ args.pop("os_user"),
+ args.pop("mail_user"),
+ args["run"],
+ )
+ logging.debug("setuid successful")
+ run = args.pop("run")
+ reply = run(s, **args)
+ json.dump(reply, stdout)
+ except QMAuthError as qerr:
+ errmsg = dict(error=qerr.msg, **qerr.info)
+ json.dump(errmsg, stdout)
+ sysexit(3)
+ except Exception:
+ logging.exception("qmauth.py error")
+ sysexit(4)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/script/testauthenticator.py b/script/testauthenticator.py
new file mode 100755
index 0000000..91767ad
--- /dev/null
+++ b/script/testauthenticator.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python3
+
+import logging
+import os
+import sys
+
+VALID_USER = b"mockmaildir@example.org"
+VALID_PW = b"12345"
+
+
+def main():
+ with os.fdopen(3, "rb") as authfd:
+ inp = authfd.read()
+
+ u, p, *r = inp.split(b"\0")
+ if len(r) > 2:
+ logging.warning("too many fields!")
+ sys.exit(2)
+
+ if r and r[0]:
+ raise ValueError("cram currently not supported")
+ else:
+ if u == VALID_USER and p == VALID_PW:
+ os.execvp(sys.argv[1], sys.argv[1:])
+
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()