summaryrefslogtreecommitdiff
path: root/script/qmauth.py
diff options
context:
space:
mode:
Diffstat (limited to 'script/qmauth.py')
-rwxr-xr-xscript/qmauth.py138
1 files changed, 111 insertions, 27 deletions
diff --git a/script/qmauth.py b/script/qmauth.py
index 38e5d58..662b39c 100755
--- a/script/qmauth.py
+++ b/script/qmauth.py
@@ -31,6 +31,7 @@ from argparse import ArgumentParser
from datetime import datetime
from functools import cache
from glob import glob
+from itertools import islice
from mailbox import Maildir, MaildirMessage
from os import environ, getpid, path, setuid, stat
from pathlib import Path
@@ -51,7 +52,7 @@ class MyMaildir(Maildir):
p_new = glob(path.join(self.__path, 'new', mid + '*'))
res = p_cur + p_new
if len(res) != 1:
- raise LookupError("could not uniquely identify file for mail-id")
+ raise LookupError(f"could not uniquely identify file for mail-id {mid!r}", mid)
return res[0]
def get_folder(self, folder):
@@ -61,6 +62,13 @@ class MyMaildir(Maildir):
)
+class QMAuthError(Exception):
+
+ def __init__(self, msg, **args):
+ self.msg = msg
+ self.info = args
+
+
@cache
def _file_size(fname):
return stat(fname).st_size
@@ -81,24 +89,34 @@ def _get_rcv_time(mid):
return float(mid[:idx])
-def startup(maildir, su, user, headers_only):
+def startup(maildir, su, user, mode):
del environ['PATH']
netfehcom_uid = getpwnam(su).pw_uid
- assert netfehcom_uid, "must be non root"
+ if not netfehcom_uid:
+ logging.error("user must not be root")
+ sysexit(5)
try:
setuid(netfehcom_uid)
except OSError:
logging.exception("error setting uid")
sysexit(5)
+ def create_messages(mail_file):
+ if mode == count_mails:
+ msg = MaildirMessage(None)
+ elif mode == list_mails:
+ msg = MaildirMessage(email.parser.BytesHeaderParser(policy=email.policy.default).parse(mail_file))
+ else:
+ msg = email.parser.BytesParser(policy=email.policy.default).parse(mail_file)
+
+ return msg
+
return MyMaildir(
maildir / user,
create=False,
- factory=lambda x: MaildirMessage(
- email.parser.BytesParser(policy=email.policy.default).parse(x, headersonly=headers_only)
- ),
+ factory=create_messages,
)
@@ -203,32 +221,36 @@ def count_mails(f, subfolder):
def _get_body(mail):
if not mail.is_multipart():
if mail.get_content_maintype() == 'text':
- return mail.get_payload(decode=True).decode(
- encoding=mail.get_content_charset(failobj='utf-8'),
- errors='replace')
+ return mail.get_content()
else:
- return mail.get_payload()
+ ret = mail.get_content()
+ if ret.isascii():
+ return ret.decode(encoding='ascii')
+ raise ValueError(f"unsupported content type in leaf part {mail.get_content_type()}")
if (mctype := mail.get_content_maintype()) == 'message':
- mm = mail.get_payload()
- assert len(mm) == 1
- msg = mm[0]
+ msg = mail.get_content()
return {
'head': _get_head_info(msg),
- 'body': msg.get_content(),
+ 'body': _get_body(msg),
}
elif mctype == 'multipart':
- return {
+ ret = {
'preamble': mail.preamble,
- 'parts': [
- {
- 'head': _get_mime_head_info(part),
- 'body': _get_body(part),
- }
- for part in mail.get_payload()
- ],
+ 'parts': [],
'epilogue': mail.epilogue,
}
+ for part in mail.iter_parts():
+ head = _get_mime_head_info(part)
+ if not head['content_disposition'] == 'attachment':
+ body = _get_body(part)
+ else:
+ body = None
+ ret['parts'].append({
+ 'head': head,
+ 'body': body,
+ })
+ return ret
else:
raise ValueError(f"unknown major content-type {mctype!r}")
@@ -237,9 +259,9 @@ def read_mail(f, subfolder, mid):
if subfolder:
f = f.get_folder(subfolder)
- msg = f[mid]
+ msg = f.get(mid, None)
if not msg:
- return {'error': "no such message", 'mid': mid}
+ raise QMAuthError("no such message", mid=mid)
return {
'head': _get_head_info(msg),
@@ -247,6 +269,60 @@ def read_mail(f, subfolder, mid):
}
+def _descent(xx):
+ head = _get_mime_head_info(xx)
+ if (mctype := head['content_maintype']) == 'message':
+ body = list(xx.iter_parts())[0]
+ elif mctype == 'multipart':
+ body = xx.iter_parts()
+ else:
+ body = xx.get_content()
+ return {
+ 'head': head,
+ 'body': body,
+ }
+
+
+def raw_mail(f, subfolder, mid, path):
+ if subfolder:
+ f = f.get_folder(subfolder)
+
+ pth = [int(seg) for seg in path.split('.')] if path else []
+ mail = {
+ 'head': {"content_maintype": "message", "content_subtype": "rfc822"},
+ 'body': f[mid],
+ }
+
+ for n in pth:
+ mctype = mail['head']['content_maintype']
+
+ if mctype == 'multipart':
+ try:
+ res = next(islice(mail['body'], n, None))
+ except StopIteration:
+ raise QMAuthError("out of bounds path for mail", path=pth)
+ mail = _descent(res)
+ elif mctype == 'message':
+ assert n == 0
+ mail = _descent(mail['body'])
+ else:
+ raise QMAuthError(f"can not descent into non multipart content type {mctype}")
+
+ if hasattr(mail['body'], '__next__'):
+ raise QMAuthError("can not stop at multipart section", path=pth)
+
+ json.dump(mail['head'], stdout)
+ stdout.write("\n")
+ if type(mail['body']) is str:
+ stdout.write(mail['body'])
+ elif type(mail['body']) is bytes:
+ stdout.flush()
+ stdout.buffer.write(mail['body'])
+ else:
+ stdout.write(str(mail['body']))
+ sysexit(0)
+
+
def _matches(m, pattern):
if m.is_multipart():
return any(
@@ -317,6 +393,12 @@ sp_read.add_argument('subfolder')
sp_read.add_argument('mid', metavar='message')
sp_read.set_defaults(run=read_mail)
+sp_raw = sp.add_parser('raw')
+sp_raw.add_argument('subfolder')
+sp_raw.add_argument('mid', metavar='message')
+sp_raw.add_argument('path', default='')
+sp_raw.set_defaults(run=raw_mail)
+
sp_folders = sp.add_parser('folders')
sp_folders.set_defaults(run=folders)
@@ -344,14 +426,16 @@ if __name__ == '__main__':
args.pop('maildir_path'),
args.pop('os_user'),
args.pop('mail_user'),
- args['run'] == list_mails,
+ args['run'],
)
logging.debug("setuid successful")
run = args.pop('run')
reply = run(s, **args)
json.dump(reply, stdout)
- if isinstance(reply, dict) and 'error' in reply:
- sysexit(3)
+ except QMAuthError as qerr:
+ errmsg = dict(error=qerr.msg, **qerr.info)
+ json.dump(errmsg, stdout)
+ sysexit(3)
except Exception:
logging.exception("qmauth.py error")
sysexit(4)