diff options
author | Jannis M. Hoffmann <jannis@fehcom.de> | 2023-03-10 13:54:57 +0100 |
---|---|---|
committer | Jannis M. Hoffmann <jannis@fehcom.de> | 2023-03-10 13:54:57 +0100 |
commit | fcf5549584b69e62b6c2f0eb919f6799c7904211 (patch) | |
tree | e5f0e480af0f39f1c0f457ea0aca8d33f8fb4d0b /script/qmauth.py | |
parent | df59f9dec32d7f8f08706fd3eb5b784deaa0abfc (diff) |
Proper recursive rendering of mails to html
1. Added raw mode to model
2. Added raw route
3. Moved readmail view parts to RenderMail plugin
4. Renamed displayheaders partial templates
Diffstat (limited to 'script/qmauth.py')
-rwxr-xr-x | script/qmauth.py | 138 |
1 files changed, 111 insertions, 27 deletions
diff --git a/script/qmauth.py b/script/qmauth.py index 38e5d58..662b39c 100755 --- a/script/qmauth.py +++ b/script/qmauth.py @@ -31,6 +31,7 @@ from argparse import ArgumentParser from datetime import datetime from functools import cache from glob import glob +from itertools import islice from mailbox import Maildir, MaildirMessage from os import environ, getpid, path, setuid, stat from pathlib import Path @@ -51,7 +52,7 @@ class MyMaildir(Maildir): p_new = glob(path.join(self.__path, 'new', mid + '*')) res = p_cur + p_new if len(res) != 1: - raise LookupError("could not uniquely identify file for mail-id") + raise LookupError(f"could not uniquely identify file for mail-id {mid!r}", mid) return res[0] def get_folder(self, folder): @@ -61,6 +62,13 @@ class MyMaildir(Maildir): ) +class QMAuthError(Exception): + + def __init__(self, msg, **args): + self.msg = msg + self.info = args + + @cache def _file_size(fname): return stat(fname).st_size @@ -81,24 +89,34 @@ def _get_rcv_time(mid): return float(mid[:idx]) -def startup(maildir, su, user, headers_only): +def startup(maildir, su, user, mode): del environ['PATH'] netfehcom_uid = getpwnam(su).pw_uid - assert netfehcom_uid, "must be non root" + if not netfehcom_uid: + logging.error("user must not be root") + sysexit(5) try: setuid(netfehcom_uid) except OSError: logging.exception("error setting uid") sysexit(5) + def create_messages(mail_file): + if mode == count_mails: + msg = MaildirMessage(None) + elif mode == list_mails: + msg = MaildirMessage(email.parser.BytesHeaderParser(policy=email.policy.default).parse(mail_file)) + else: + msg = email.parser.BytesParser(policy=email.policy.default).parse(mail_file) + + return msg + return MyMaildir( maildir / user, create=False, - factory=lambda x: MaildirMessage( - email.parser.BytesParser(policy=email.policy.default).parse(x, headersonly=headers_only) - ), + factory=create_messages, ) @@ -203,32 +221,36 @@ def count_mails(f, subfolder): def _get_body(mail): if not mail.is_multipart(): if mail.get_content_maintype() == 'text': - return mail.get_payload(decode=True).decode( - encoding=mail.get_content_charset(failobj='utf-8'), - errors='replace') + return mail.get_content() else: - return mail.get_payload() + ret = mail.get_content() + if ret.isascii(): + return ret.decode(encoding='ascii') + raise ValueError(f"unsupported content type in leaf part {mail.get_content_type()}") if (mctype := mail.get_content_maintype()) == 'message': - mm = mail.get_payload() - assert len(mm) == 1 - msg = mm[0] + msg = mail.get_content() return { 'head': _get_head_info(msg), - 'body': msg.get_content(), + 'body': _get_body(msg), } elif mctype == 'multipart': - return { + ret = { 'preamble': mail.preamble, - 'parts': [ - { - 'head': _get_mime_head_info(part), - 'body': _get_body(part), - } - for part in mail.get_payload() - ], + 'parts': [], 'epilogue': mail.epilogue, } + for part in mail.iter_parts(): + head = _get_mime_head_info(part) + if not head['content_disposition'] == 'attachment': + body = _get_body(part) + else: + body = None + ret['parts'].append({ + 'head': head, + 'body': body, + }) + return ret else: raise ValueError(f"unknown major content-type {mctype!r}") @@ -237,9 +259,9 @@ def read_mail(f, subfolder, mid): if subfolder: f = f.get_folder(subfolder) - msg = f[mid] + msg = f.get(mid, None) if not msg: - return {'error': "no such message", 'mid': mid} + raise QMAuthError("no such message", mid=mid) return { 'head': _get_head_info(msg), @@ -247,6 +269,60 @@ def read_mail(f, subfolder, mid): } +def _descent(xx): + head = _get_mime_head_info(xx) + if (mctype := head['content_maintype']) == 'message': + body = list(xx.iter_parts())[0] + elif mctype == 'multipart': + body = xx.iter_parts() + else: + body = xx.get_content() + return { + 'head': head, + 'body': body, + } + + +def raw_mail(f, subfolder, mid, path): + if subfolder: + f = f.get_folder(subfolder) + + pth = [int(seg) for seg in path.split('.')] if path else [] + mail = { + 'head': {"content_maintype": "message", "content_subtype": "rfc822"}, + 'body': f[mid], + } + + for n in pth: + mctype = mail['head']['content_maintype'] + + if mctype == 'multipart': + try: + res = next(islice(mail['body'], n, None)) + except StopIteration: + raise QMAuthError("out of bounds path for mail", path=pth) + mail = _descent(res) + elif mctype == 'message': + assert n == 0 + mail = _descent(mail['body']) + else: + raise QMAuthError(f"can not descent into non multipart content type {mctype}") + + if hasattr(mail['body'], '__next__'): + raise QMAuthError("can not stop at multipart section", path=pth) + + json.dump(mail['head'], stdout) + stdout.write("\n") + if type(mail['body']) is str: + stdout.write(mail['body']) + elif type(mail['body']) is bytes: + stdout.flush() + stdout.buffer.write(mail['body']) + else: + stdout.write(str(mail['body'])) + sysexit(0) + + def _matches(m, pattern): if m.is_multipart(): return any( @@ -317,6 +393,12 @@ sp_read.add_argument('subfolder') sp_read.add_argument('mid', metavar='message') sp_read.set_defaults(run=read_mail) +sp_raw = sp.add_parser('raw') +sp_raw.add_argument('subfolder') +sp_raw.add_argument('mid', metavar='message') +sp_raw.add_argument('path', default='') +sp_raw.set_defaults(run=raw_mail) + sp_folders = sp.add_parser('folders') sp_folders.set_defaults(run=folders) @@ -344,14 +426,16 @@ if __name__ == '__main__': args.pop('maildir_path'), args.pop('os_user'), args.pop('mail_user'), - args['run'] == list_mails, + args['run'], ) logging.debug("setuid successful") run = args.pop('run') reply = run(s, **args) json.dump(reply, stdout) - if isinstance(reply, dict) and 'error' in reply: - sysexit(3) + except QMAuthError as qerr: + errmsg = dict(error=qerr.msg, **qerr.info) + json.dump(errmsg, stdout) + sysexit(3) except Exception: logging.exception("qmauth.py error") sysexit(4) |