diff options
Diffstat (limited to 'script/extract.py')
-rwxr-xr-x | script/extract.py | 448 |
1 files changed, 448 insertions, 0 deletions
diff --git a/script/extract.py b/script/extract.py new file mode 100755 index 0000000..2d78f81 --- /dev/null +++ b/script/extract.py @@ -0,0 +1,448 @@ +#!/usr/bin/env python3 + +"""qmauth.py + +Extract delivers information about emails from a maildir. +Runs with elevated privileges. + +This program is started by qmail-authuser with elevated privileges after +a successful login. +Input directives are provided as command line arguments. +Output is delivered via STDOUT as json and log information via STDERR. + +Exit codes:: + + 1 reserved + 2 reserved + 3 operational error (error message in output) + 4 user error (no output) + 5 issue switching to user (no output) + 110 reserved + 111 reserved +""" + +import email.parser +import email.policy +import json +import logging +import re + +from argparse import ArgumentParser +from base64 import b64encode +from datetime import datetime +from glob import glob +from itertools import islice +from mailbox import Maildir, MaildirMessage +from os import environ, getpid, path, setuid +from pathlib import Path +from pwd import getpwnam +from sys import exit as sysexit, stdout + + +class MyMaildir(Maildir): + + def __init__(self, dirname, *args, **kwargs): + self.__path = dirname + super().__init__(dirname, *args, **kwargs) + + def get_filename(self, mid): + p_cur = glob(path.join(self.__path, 'cur', mid + '*')) + p_new = glob(path.join(self.__path, 'new', mid + '*')) + res = p_cur + p_new + if len(res) != 1: + raise LookupError(f"could not uniquely identify file for mail-id {mid!r}", mid) + return res[0] + + def get_folder(self, folder): + # copy from internal implementation + return MyMaildir( + path.join(self._path, '.' + folder), factory=self._factory, create=False, + ) + + +class QMAuthError(Exception): + + def __init__(self, msg, **args): + self.msg = msg + self.info = args + + +def _adr(addrs): + if addrs is None: + return None + return [ + {'address': addr.addr_spec, 'display_name': addr.display_name} + for addr in addrs.addresses + ] + + +def _get_rcv_time(mid): + idx = mid.find('.') + assert idx >= 0 + return float(mid[:idx]) + + +def startup(maildir, su, user, mode): + + del environ['PATH'] + + netfehcom_uid = getpwnam(su).pw_uid + if not netfehcom_uid: + logging.error("user must not be root") + sysexit(5) + try: + setuid(netfehcom_uid) + except OSError: + logging.exception("error setting uid") + sysexit(5) + + def create_messages(mail_file): + if mode == count_mails: + msg = MaildirMessage(None) + elif mode == list_mails: + msg = MaildirMessage(email.parser.BytesHeaderParser(policy=email.policy.default).parse(mail_file)) + else: + msg = email.parser.BytesParser(policy=email.policy.default).parse(mail_file) + + return msg + + return MyMaildir( + maildir / user, + create=False, + factory=create_messages, + ) + + +def _sort_by_sender(midmsg): + _, msg = midmsg + + if len(addrs := msg['from'].addresses) == 1: + return addrs[0].addr_spec + else: + return msg['sender'].address.addr_spec + + +def _sort_mails(f, sort): + + reverse = False + if sort.startswith('!'): + reverse = True + sort = sort[1:] + + by_rec_date = lambda midmsg: float(re.match(r"\d+\.\d+", midmsg[0], re.ASCII)[0]) + + if sort == 'date': keyfn = by_rec_date + elif sort == 'sender': keyfn = _sort_by_sender + elif sort == 'subject': keyfn = lambda midmsg: midmsg[1]['subject'] + elif sort == 'size': keyfn = lambda midmsg: path.getsize(f.get_filename(midmsg[0])) + elif sort == '': keyfn = by_rec_date + else: + logging.warning("unknown sort-verb %r", sort) + reverse = False + keyfn = by_rec_date + + return keyfn, reverse + + +def _get_mime_head_info(msg): + return { + 'content_maintype': msg.get_content_maintype(), + 'content_subtype': msg.get_content_subtype(), + 'content_disposition': msg.get_content_disposition(), + 'filename': msg.get_filename(), + } + + +def _get_head_info(msg): + return { + 'date': msg['date'].datetime.isoformat(), + + 'from': _adr(msg['from']), + 'sender': _adr(msg['sender']), + 'reply_to': _adr(msg['reply-to']), + + 'to': _adr(msg['to']), + 'cc': _adr(msg['cc']), + 'bcc': _adr(msg['bcc']), + + 'subject': msg['subject'], + 'comments': msg['comments'], + 'keywords': msg['keywords'], + + 'mime': _get_mime_head_info(msg), + } + + +def list_mails(f, start, end, sortby, folder): + + assert 0 <= start <= end + + if folder: + f = f.get_folder(folder) + + if start == end: + return [] + + kfn, reverse = _sort_mails(f, sortby) + msgs = list(f.items()) + msgs.sort(key=kfn, reverse=reverse) + msgs = msgs[start : min(len(msgs), end)] + + return [ + { + 'message_handle': mid, + 'byte_size': path.getsize(f.get_filename(mid)), + 'unread': 'S' in msg.get_flags(), + 'date_received': datetime.fromtimestamp(_get_rcv_time(mid)).isoformat(), + 'head': _get_head_info(msg), + } + for mid, msg in msgs + ] + + +def count_mails(f, subfolder): + if subfolder: + f = f.get_folder(subfolder) + + return { + 'total_mails': len(f), + 'byte_size': sum(path.getsize(f.get_filename(mid)) for mid in f.keys()), + 'unread_mails': len([1 for m in f if 'S' in m.get_flags()]), + } + + +def _get_body(mail): + if not mail.is_multipart(): + if mail.get_content_maintype() == 'text': + return mail.get_content() + else: + ret = mail.get_content() + if ret.isascii(): + return ret.decode(encoding='ascii') + elif len(ret) <= 128*1024: + return b64encode(ret).decode(encoding='ascii') + else: + raise QMAuthError("non attachment part too large (>512kB)", size=len(ret)) + + if (mctype := mail.get_content_maintype()) == 'message': + msg = mail.get_content() + return { + 'head': _get_head_info(msg), + 'body': _get_body(msg), + } + elif mctype == 'multipart': + ret = { + 'preamble': mail.preamble, + 'parts': [], + 'epilogue': mail.epilogue, + } + for part in mail.iter_parts(): + head = _get_mime_head_info(part) + if head['content_disposition'] != 'attachment': + body = _get_body(part) + else: + body = None + ret['parts'].append({ + 'head': head, + 'body': body, + }) + return ret + else: + raise ValueError(f"unknown major content-type {mctype!r}") + + +def read_mail(f, subfolder, mid): + if subfolder: + f = f.get_folder(subfolder) + + msg = f.get(mid, None) + if not msg: + raise QMAuthError("no such message", mid=mid) + + return { + 'head': _get_head_info(msg), + 'body': _get_body(msg), + } + + +def _descent(xx): + head = _get_mime_head_info(xx) + if (mctype := head['content_maintype']) == 'message': + body = xx.get_content() + elif mctype == 'multipart': + body = xx.iter_parts() + else: + body = xx.get_content() + return { + 'head': head, + 'body': body, + } + + +def raw_mail(f, subfolder, mid, path): + if subfolder: + f = f.get_folder(subfolder) + + msg = f.get(mid, None) + if not msg: + raise QMAuthError("no such message", mid=mid) + + pth = [int(seg) for seg in path.split('.')] if path else [] + mail = { + 'head': {"content_maintype": "message", "content_subtype": "rfc822"}, + 'body': msg, + } + + for n in pth: + mctype = mail['head']['content_maintype'] + + if mctype == 'multipart': + try: + res = next(islice(mail['body'], n, None)) + except StopIteration: + raise QMAuthError("out of bounds path for mail", path=pth) + mail = _descent(res) + elif mctype == 'message': + assert n == 0 + mail = _descent(mail['body']) + else: + raise QMAuthError(f"can not descent into non multipart content type {mctype}") + + if hasattr(mail['body'], '__next__'): + raise QMAuthError("can not stop at multipart section", path=pth) + + json.dump(mail['head'], stdout) + stdout.write("\n") + if type(mail['body']) is str: + stdout.write(mail['body']) + elif type(mail['body']) is bytes: + stdout.flush() + stdout.buffer.write(mail['body']) + else: + stdout.write(str(mail['body'])) + sysexit(0) + + +def _matches(m, pattern): + if m.is_multipart(): + return any( + 1 + for part in m.body.parts + if re.search(pattern, part.decoded()) or re.search(pattern, part.subject) + ) + return re.search(pattern, m.body.decoded()) or re.search(pattern, m.subject) + + +def search_mails(f, pattern: str, subfolder: str): + if subfolder: + f = f.get_folder(subfolder) + + return [ + { + 'head': _get_head_info(msg), + 'body': _get_body(msg), + } + for msg in f.values() + if _matches(msg, pattern) + ] + + +def folders(f): + return f.list_folders() + + +def move_mail(f, mid, from_, to): + if from_: + f = f.get_folder(from_) + + fname = Path(f.get_filename(mid)) + + assert to in f.list_folders() + + sep = -2 if not from_ else -3 + + if to: + res = fname.parts[:sep] + ('.' + to,) + fname.parts[-2:] + else: + res = fname.parts[:sep] + fname.parts[-2:] + + fname.rename(Path(*res)) + + return 1 + + +def parse_arguments(): + ap = ArgumentParser(allow_abbrev=False) + ap.add_argument('maildir_path', type=Path) + ap.add_argument('os_user') + ap.add_argument('mail_user') + + sp = ap.add_subparsers(title='methods', required=True) + + sp_list = sp.add_parser('list') + sp_list.add_argument('folder', metavar='subfolder') + sp_list.add_argument('start', type=int) + sp_list.add_argument('end', type=int) + sp_list.add_argument('sortby', metavar='sort_by') + sp_list.set_defaults(run=list_mails) + + sp_count = sp.add_parser('count') + sp_count.add_argument('subfolder') + sp_count.set_defaults(run=count_mails) + + sp_read = sp.add_parser('read') + sp_read.add_argument('subfolder') + sp_read.add_argument('mid', metavar='message') + sp_read.set_defaults(run=read_mail) + + sp_raw = sp.add_parser('raw') + sp_raw.add_argument('subfolder') + sp_raw.add_argument('mid', metavar='message') + sp_raw.add_argument('path', default='') + sp_raw.set_defaults(run=raw_mail) + + sp_folders = sp.add_parser('folders') + sp_folders.set_defaults(run=folders) + + sp_move = sp.add_parser('move') + sp_move.add_argument('mid', metavar='message') + sp_move.add_argument('from_', metavar='from') + sp_move.add_argument('to') + sp_move.set_defaults(run=move_mail) + + sp_search = sp.add_parser('search') + sp_search.add_argument('pattern') + sp_search.add_argument('subfolder') + sp_search.set_defaults(run=search_mails) + + return vars(ap.parse_args()) + + +def main(): + try: + logging.basicConfig( + level='INFO', + format="%(levelname)s:"+str(getpid())+":%(message)s", + ) + args = parse_arguments() + logging.debug("started with %s", args) + s = startup( + args.pop('maildir_path'), + args.pop('os_user'), + args.pop('mail_user'), + args['run'], + ) + logging.debug("setuid successful") + run = args.pop('run') + reply = run(s, **args) + json.dump(reply, stdout) + except QMAuthError as qerr: + errmsg = dict(error=qerr.msg, **qerr.info) + json.dump(errmsg, stdout) + sysexit(3) + except Exception: + logging.exception("qmauth.py error") + sysexit(4) + + +if __name__ == '__main__': main() |