From a0d4300ee0945ef23bea48dfb53985a19eb6e951 Mon Sep 17 00:00:00 2001 From: "Jannis M. Hoffmann" Date: Thu, 21 Sep 2023 13:18:39 +0200 Subject: renamed qmauth to extract --- script/qmauth.py | 448 ------------------------------------------------------- 1 file changed, 448 deletions(-) delete mode 100755 script/qmauth.py (limited to 'script/qmauth.py') diff --git a/script/qmauth.py b/script/qmauth.py deleted file mode 100755 index 2d78f81..0000000 --- a/script/qmauth.py +++ /dev/null @@ -1,448 +0,0 @@ -#!/usr/bin/env python3 - -"""qmauth.py - -Extract delivers information about emails from a maildir. -Runs with elevated privileges. - -This program is started by qmail-authuser with elevated privileges after -a successful login. -Input directives are provided as command line arguments. -Output is delivered via STDOUT as json and log information via STDERR. - -Exit codes:: - - 1 reserved - 2 reserved - 3 operational error (error message in output) - 4 user error (no output) - 5 issue switching to user (no output) - 110 reserved - 111 reserved -""" - -import email.parser -import email.policy -import json -import logging -import re - -from argparse import ArgumentParser -from base64 import b64encode -from datetime import datetime -from glob import glob -from itertools import islice -from mailbox import Maildir, MaildirMessage -from os import environ, getpid, path, setuid -from pathlib import Path -from pwd import getpwnam -from sys import exit as sysexit, stdout - - -class MyMaildir(Maildir): - - def __init__(self, dirname, *args, **kwargs): - self.__path = dirname - super().__init__(dirname, *args, **kwargs) - - def get_filename(self, mid): - p_cur = glob(path.join(self.__path, 'cur', mid + '*')) - p_new = glob(path.join(self.__path, 'new', mid + '*')) - res = p_cur + p_new - if len(res) != 1: - raise LookupError(f"could not uniquely identify file for mail-id {mid!r}", mid) - return res[0] - - def get_folder(self, folder): - # copy from internal implementation - return MyMaildir( - path.join(self._path, '.' + folder), factory=self._factory, create=False, - ) - - -class QMAuthError(Exception): - - def __init__(self, msg, **args): - self.msg = msg - self.info = args - - -def _adr(addrs): - if addrs is None: - return None - return [ - {'address': addr.addr_spec, 'display_name': addr.display_name} - for addr in addrs.addresses - ] - - -def _get_rcv_time(mid): - idx = mid.find('.') - assert idx >= 0 - return float(mid[:idx]) - - -def startup(maildir, su, user, mode): - - del environ['PATH'] - - netfehcom_uid = getpwnam(su).pw_uid - if not netfehcom_uid: - logging.error("user must not be root") - sysexit(5) - try: - setuid(netfehcom_uid) - except OSError: - logging.exception("error setting uid") - sysexit(5) - - def create_messages(mail_file): - if mode == count_mails: - msg = MaildirMessage(None) - elif mode == list_mails: - msg = MaildirMessage(email.parser.BytesHeaderParser(policy=email.policy.default).parse(mail_file)) - else: - msg = email.parser.BytesParser(policy=email.policy.default).parse(mail_file) - - return msg - - return MyMaildir( - maildir / user, - create=False, - factory=create_messages, - ) - - -def _sort_by_sender(midmsg): - _, msg = midmsg - - if len(addrs := msg['from'].addresses) == 1: - return addrs[0].addr_spec - else: - return msg['sender'].address.addr_spec - - -def _sort_mails(f, sort): - - reverse = False - if sort.startswith('!'): - reverse = True - sort = sort[1:] - - by_rec_date = lambda midmsg: float(re.match(r"\d+\.\d+", midmsg[0], re.ASCII)[0]) - - if sort == 'date': keyfn = by_rec_date - elif sort == 'sender': keyfn = _sort_by_sender - elif sort == 'subject': keyfn = lambda midmsg: midmsg[1]['subject'] - elif sort == 'size': keyfn = lambda midmsg: path.getsize(f.get_filename(midmsg[0])) - elif sort == '': keyfn = by_rec_date - else: - logging.warning("unknown sort-verb %r", sort) - reverse = False - keyfn = by_rec_date - - return keyfn, reverse - - -def _get_mime_head_info(msg): - return { - 'content_maintype': msg.get_content_maintype(), - 'content_subtype': msg.get_content_subtype(), - 'content_disposition': msg.get_content_disposition(), - 'filename': msg.get_filename(), - } - - -def _get_head_info(msg): - return { - 'date': msg['date'].datetime.isoformat(), - - 'from': _adr(msg['from']), - 'sender': _adr(msg['sender']), - 'reply_to': _adr(msg['reply-to']), - - 'to': _adr(msg['to']), - 'cc': _adr(msg['cc']), - 'bcc': _adr(msg['bcc']), - - 'subject': msg['subject'], - 'comments': msg['comments'], - 'keywords': msg['keywords'], - - 'mime': _get_mime_head_info(msg), - } - - -def list_mails(f, start, end, sortby, folder): - - assert 0 <= start <= end - - if folder: - f = f.get_folder(folder) - - if start == end: - return [] - - kfn, reverse = _sort_mails(f, sortby) - msgs = list(f.items()) - msgs.sort(key=kfn, reverse=reverse) - msgs = msgs[start : min(len(msgs), end)] - - return [ - { - 'message_handle': mid, - 'byte_size': path.getsize(f.get_filename(mid)), - 'unread': 'S' in msg.get_flags(), - 'date_received': datetime.fromtimestamp(_get_rcv_time(mid)).isoformat(), - 'head': _get_head_info(msg), - } - for mid, msg in msgs - ] - - -def count_mails(f, subfolder): - if subfolder: - f = f.get_folder(subfolder) - - return { - 'total_mails': len(f), - 'byte_size': sum(path.getsize(f.get_filename(mid)) for mid in f.keys()), - 'unread_mails': len([1 for m in f if 'S' in m.get_flags()]), - } - - -def _get_body(mail): - if not mail.is_multipart(): - if mail.get_content_maintype() == 'text': - return mail.get_content() - else: - ret = mail.get_content() - if ret.isascii(): - return ret.decode(encoding='ascii') - elif len(ret) <= 128*1024: - return b64encode(ret).decode(encoding='ascii') - else: - raise QMAuthError("non attachment part too large (>512kB)", size=len(ret)) - - if (mctype := mail.get_content_maintype()) == 'message': - msg = mail.get_content() - return { - 'head': _get_head_info(msg), - 'body': _get_body(msg), - } - elif mctype == 'multipart': - ret = { - 'preamble': mail.preamble, - 'parts': [], - 'epilogue': mail.epilogue, - } - for part in mail.iter_parts(): - head = _get_mime_head_info(part) - if head['content_disposition'] != 'attachment': - body = _get_body(part) - else: - body = None - ret['parts'].append({ - 'head': head, - 'body': body, - }) - return ret - else: - raise ValueError(f"unknown major content-type {mctype!r}") - - -def read_mail(f, subfolder, mid): - if subfolder: - f = f.get_folder(subfolder) - - msg = f.get(mid, None) - if not msg: - raise QMAuthError("no such message", mid=mid) - - return { - 'head': _get_head_info(msg), - 'body': _get_body(msg), - } - - -def _descent(xx): - head = _get_mime_head_info(xx) - if (mctype := head['content_maintype']) == 'message': - body = xx.get_content() - elif mctype == 'multipart': - body = xx.iter_parts() - else: - body = xx.get_content() - return { - 'head': head, - 'body': body, - } - - -def raw_mail(f, subfolder, mid, path): - if subfolder: - f = f.get_folder(subfolder) - - msg = f.get(mid, None) - if not msg: - raise QMAuthError("no such message", mid=mid) - - pth = [int(seg) for seg in path.split('.')] if path else [] - mail = { - 'head': {"content_maintype": "message", "content_subtype": "rfc822"}, - 'body': msg, - } - - for n in pth: - mctype = mail['head']['content_maintype'] - - if mctype == 'multipart': - try: - res = next(islice(mail['body'], n, None)) - except StopIteration: - raise QMAuthError("out of bounds path for mail", path=pth) - mail = _descent(res) - elif mctype == 'message': - assert n == 0 - mail = _descent(mail['body']) - else: - raise QMAuthError(f"can not descent into non multipart content type {mctype}") - - if hasattr(mail['body'], '__next__'): - raise QMAuthError("can not stop at multipart section", path=pth) - - json.dump(mail['head'], stdout) - stdout.write("\n") - if type(mail['body']) is str: - stdout.write(mail['body']) - elif type(mail['body']) is bytes: - stdout.flush() - stdout.buffer.write(mail['body']) - else: - stdout.write(str(mail['body'])) - sysexit(0) - - -def _matches(m, pattern): - if m.is_multipart(): - return any( - 1 - for part in m.body.parts - if re.search(pattern, part.decoded()) or re.search(pattern, part.subject) - ) - return re.search(pattern, m.body.decoded()) or re.search(pattern, m.subject) - - -def search_mails(f, pattern: str, subfolder: str): - if subfolder: - f = f.get_folder(subfolder) - - return [ - { - 'head': _get_head_info(msg), - 'body': _get_body(msg), - } - for msg in f.values() - if _matches(msg, pattern) - ] - - -def folders(f): - return f.list_folders() - - -def move_mail(f, mid, from_, to): - if from_: - f = f.get_folder(from_) - - fname = Path(f.get_filename(mid)) - - assert to in f.list_folders() - - sep = -2 if not from_ else -3 - - if to: - res = fname.parts[:sep] + ('.' + to,) + fname.parts[-2:] - else: - res = fname.parts[:sep] + fname.parts[-2:] - - fname.rename(Path(*res)) - - return 1 - - -def parse_arguments(): - ap = ArgumentParser(allow_abbrev=False) - ap.add_argument('maildir_path', type=Path) - ap.add_argument('os_user') - ap.add_argument('mail_user') - - sp = ap.add_subparsers(title='methods', required=True) - - sp_list = sp.add_parser('list') - sp_list.add_argument('folder', metavar='subfolder') - sp_list.add_argument('start', type=int) - sp_list.add_argument('end', type=int) - sp_list.add_argument('sortby', metavar='sort_by') - sp_list.set_defaults(run=list_mails) - - sp_count = sp.add_parser('count') - sp_count.add_argument('subfolder') - sp_count.set_defaults(run=count_mails) - - sp_read = sp.add_parser('read') - sp_read.add_argument('subfolder') - sp_read.add_argument('mid', metavar='message') - sp_read.set_defaults(run=read_mail) - - sp_raw = sp.add_parser('raw') - sp_raw.add_argument('subfolder') - sp_raw.add_argument('mid', metavar='message') - sp_raw.add_argument('path', default='') - sp_raw.set_defaults(run=raw_mail) - - sp_folders = sp.add_parser('folders') - sp_folders.set_defaults(run=folders) - - sp_move = sp.add_parser('move') - sp_move.add_argument('mid', metavar='message') - sp_move.add_argument('from_', metavar='from') - sp_move.add_argument('to') - sp_move.set_defaults(run=move_mail) - - sp_search = sp.add_parser('search') - sp_search.add_argument('pattern') - sp_search.add_argument('subfolder') - sp_search.set_defaults(run=search_mails) - - return vars(ap.parse_args()) - - -def main(): - try: - logging.basicConfig( - level='INFO', - format="%(levelname)s:"+str(getpid())+":%(message)s", - ) - args = parse_arguments() - logging.debug("started with %s", args) - s = startup( - args.pop('maildir_path'), - args.pop('os_user'), - args.pop('mail_user'), - args['run'], - ) - logging.debug("setuid successful") - run = args.pop('run') - reply = run(s, **args) - json.dump(reply, stdout) - except QMAuthError as qerr: - errmsg = dict(error=qerr.msg, **qerr.info) - json.dump(errmsg, stdout) - sysexit(3) - except Exception: - logging.exception("qmauth.py error") - sysexit(4) - - -if __name__ == '__main__': main() -- cgit v1.2.3