diff options
Diffstat (limited to 'script/qmauth.py')
-rwxr-xr-x | script/qmauth.py | 138 |
1 files changed, 111 insertions, 27 deletions
diff --git a/script/qmauth.py b/script/qmauth.py index 38e5d58..662b39c 100755 --- a/script/qmauth.py +++ b/script/qmauth.py @@ -31,6 +31,7 @@ from argparse import ArgumentParser from datetime import datetime from functools import cache from glob import glob +from itertools import islice from mailbox import Maildir, MaildirMessage from os import environ, getpid, path, setuid, stat from pathlib import Path @@ -51,7 +52,7 @@ class MyMaildir(Maildir): p_new = glob(path.join(self.__path, 'new', mid + '*')) res = p_cur + p_new if len(res) != 1: - raise LookupError("could not uniquely identify file for mail-id") + raise LookupError(f"could not uniquely identify file for mail-id {mid!r}", mid) return res[0] def get_folder(self, folder): @@ -61,6 +62,13 @@ class MyMaildir(Maildir): ) +class QMAuthError(Exception): + + def __init__(self, msg, **args): + self.msg = msg + self.info = args + + @cache def _file_size(fname): return stat(fname).st_size @@ -81,24 +89,34 @@ def _get_rcv_time(mid): return float(mid[:idx]) -def startup(maildir, su, user, headers_only): +def startup(maildir, su, user, mode): del environ['PATH'] netfehcom_uid = getpwnam(su).pw_uid - assert netfehcom_uid, "must be non root" + if not netfehcom_uid: + logging.error("user must not be root") + sysexit(5) try: setuid(netfehcom_uid) except OSError: logging.exception("error setting uid") sysexit(5) + def create_messages(mail_file): + if mode == count_mails: + msg = MaildirMessage(None) + elif mode == list_mails: + msg = MaildirMessage(email.parser.BytesHeaderParser(policy=email.policy.default).parse(mail_file)) + else: + msg = email.parser.BytesParser(policy=email.policy.default).parse(mail_file) + + return msg + return MyMaildir( maildir / user, create=False, - factory=lambda x: MaildirMessage( - email.parser.BytesParser(policy=email.policy.default).parse(x, headersonly=headers_only) - ), + factory=create_messages, ) @@ -203,32 +221,36 @@ def count_mails(f, subfolder): def _get_body(mail): if not mail.is_multipart(): if mail.get_content_maintype() == 'text': - return mail.get_payload(decode=True).decode( - encoding=mail.get_content_charset(failobj='utf-8'), - errors='replace') + return mail.get_content() else: - return mail.get_payload() + ret = mail.get_content() + if ret.isascii(): + return ret.decode(encoding='ascii') + raise ValueError(f"unsupported content type in leaf part {mail.get_content_type()}") if (mctype := mail.get_content_maintype()) == 'message': - mm = mail.get_payload() - assert len(mm) == 1 - msg = mm[0] + msg = mail.get_content() return { 'head': _get_head_info(msg), - 'body': msg.get_content(), + 'body': _get_body(msg), } elif mctype == 'multipart': - return { + ret = { 'preamble': mail.preamble, - 'parts': [ - { - 'head': _get_mime_head_info(part), - 'body': _get_body(part), - } - for part in mail.get_payload() - ], + 'parts': [], 'epilogue': mail.epilogue, } + for part in mail.iter_parts(): + head = _get_mime_head_info(part) + if not head['content_disposition'] == 'attachment': + body = _get_body(part) + else: + body = None + ret['parts'].append({ + 'head': head, + 'body': body, + }) + return ret else: raise ValueError(f"unknown major content-type {mctype!r}") @@ -237,9 +259,9 @@ def read_mail(f, subfolder, mid): if subfolder: f = f.get_folder(subfolder) - msg = f[mid] + msg = f.get(mid, None) if not msg: - return {'error': "no such message", 'mid': mid} + raise QMAuthError("no such message", mid=mid) return { 'head': _get_head_info(msg), @@ -247,6 +269,60 @@ def read_mail(f, subfolder, mid): } +def _descent(xx): + head = _get_mime_head_info(xx) + if (mctype := head['content_maintype']) == 'message': + body = list(xx.iter_parts())[0] + elif mctype == 'multipart': + body = xx.iter_parts() + else: + body = xx.get_content() + return { + 'head': head, + 'body': body, + } + + +def raw_mail(f, subfolder, mid, path): + if subfolder: + f = f.get_folder(subfolder) + + pth = [int(seg) for seg in path.split('.')] if path else [] + mail = { + 'head': {"content_maintype": "message", "content_subtype": "rfc822"}, + 'body': f[mid], + } + + for n in pth: + mctype = mail['head']['content_maintype'] + + if mctype == 'multipart': + try: + res = next(islice(mail['body'], n, None)) + except StopIteration: + raise QMAuthError("out of bounds path for mail", path=pth) + mail = _descent(res) + elif mctype == 'message': + assert n == 0 + mail = _descent(mail['body']) + else: + raise QMAuthError(f"can not descent into non multipart content type {mctype}") + + if hasattr(mail['body'], '__next__'): + raise QMAuthError("can not stop at multipart section", path=pth) + + json.dump(mail['head'], stdout) + stdout.write("\n") + if type(mail['body']) is str: + stdout.write(mail['body']) + elif type(mail['body']) is bytes: + stdout.flush() + stdout.buffer.write(mail['body']) + else: + stdout.write(str(mail['body'])) + sysexit(0) + + def _matches(m, pattern): if m.is_multipart(): return any( @@ -317,6 +393,12 @@ sp_read.add_argument('subfolder') sp_read.add_argument('mid', metavar='message') sp_read.set_defaults(run=read_mail) +sp_raw = sp.add_parser('raw') +sp_raw.add_argument('subfolder') +sp_raw.add_argument('mid', metavar='message') +sp_raw.add_argument('path', default='') +sp_raw.set_defaults(run=raw_mail) + sp_folders = sp.add_parser('folders') sp_folders.set_defaults(run=folders) @@ -344,14 +426,16 @@ if __name__ == '__main__': args.pop('maildir_path'), args.pop('os_user'), args.pop('mail_user'), - args['run'] == list_mails, + args['run'], ) logging.debug("setuid successful") run = args.pop('run') reply = run(s, **args) json.dump(reply, stdout) - if isinstance(reply, dict) and 'error' in reply: - sysexit(3) + except QMAuthError as qerr: + errmsg = dict(error=qerr.msg, **qerr.info) + json.dump(errmsg, stdout) + sysexit(3) except Exception: logging.exception("qmauth.py error") sysexit(4) |