summaryrefslogtreecommitdiff
path: root/script/qmauth.py
diff options
context:
space:
mode:
authorJannis M. Hoffmann <jannis@fehcom.de>2023-09-21 13:18:39 +0200
committerJannis M. Hoffmann <jannis@fehcom.de>2023-09-21 13:18:39 +0200
commita0d4300ee0945ef23bea48dfb53985a19eb6e951 (patch)
tree257a20c11a28c6d74721e6f0a25f6dff9400f37c /script/qmauth.py
parent91450b24cefcb79f755e3766710585ac9e05eb4e (diff)
renamed qmauth to extract
Diffstat (limited to 'script/qmauth.py')
-rwxr-xr-xscript/qmauth.py448
1 files changed, 0 insertions, 448 deletions
diff --git a/script/qmauth.py b/script/qmauth.py
deleted file mode 100755
index 2d78f81..0000000
--- a/script/qmauth.py
+++ /dev/null
@@ -1,448 +0,0 @@
-#!/usr/bin/env python3
-
-"""qmauth.py
-
-Extract delivers information about emails from a maildir.
-Runs with elevated privileges.
-
-This program is started by qmail-authuser with elevated privileges after
-a successful login.
-Input directives are provided as command line arguments.
-Output is delivered via STDOUT as json and log information via STDERR.
-
-Exit codes::
-
- 1 reserved
- 2 reserved
- 3 operational error (error message in output)
- 4 user error (no output)
- 5 issue switching to user (no output)
- 110 reserved
- 111 reserved
-"""
-
-import email.parser
-import email.policy
-import json
-import logging
-import re
-
-from argparse import ArgumentParser
-from base64 import b64encode
-from datetime import datetime
-from glob import glob
-from itertools import islice
-from mailbox import Maildir, MaildirMessage
-from os import environ, getpid, path, setuid
-from pathlib import Path
-from pwd import getpwnam
-from sys import exit as sysexit, stdout
-
-
-class MyMaildir(Maildir):
-
- def __init__(self, dirname, *args, **kwargs):
- self.__path = dirname
- super().__init__(dirname, *args, **kwargs)
-
- def get_filename(self, mid):
- p_cur = glob(path.join(self.__path, 'cur', mid + '*'))
- p_new = glob(path.join(self.__path, 'new', mid + '*'))
- res = p_cur + p_new
- if len(res) != 1:
- raise LookupError(f"could not uniquely identify file for mail-id {mid!r}", mid)
- return res[0]
-
- def get_folder(self, folder):
- # copy from internal implementation
- return MyMaildir(
- path.join(self._path, '.' + folder), factory=self._factory, create=False,
- )
-
-
-class QMAuthError(Exception):
-
- def __init__(self, msg, **args):
- self.msg = msg
- self.info = args
-
-
-def _adr(addrs):
- if addrs is None:
- return None
- return [
- {'address': addr.addr_spec, 'display_name': addr.display_name}
- for addr in addrs.addresses
- ]
-
-
-def _get_rcv_time(mid):
- idx = mid.find('.')
- assert idx >= 0
- return float(mid[:idx])
-
-
-def startup(maildir, su, user, mode):
-
- del environ['PATH']
-
- netfehcom_uid = getpwnam(su).pw_uid
- if not netfehcom_uid:
- logging.error("user must not be root")
- sysexit(5)
- try:
- setuid(netfehcom_uid)
- except OSError:
- logging.exception("error setting uid")
- sysexit(5)
-
- def create_messages(mail_file):
- if mode == count_mails:
- msg = MaildirMessage(None)
- elif mode == list_mails:
- msg = MaildirMessage(email.parser.BytesHeaderParser(policy=email.policy.default).parse(mail_file))
- else:
- msg = email.parser.BytesParser(policy=email.policy.default).parse(mail_file)
-
- return msg
-
- return MyMaildir(
- maildir / user,
- create=False,
- factory=create_messages,
- )
-
-
-def _sort_by_sender(midmsg):
- _, msg = midmsg
-
- if len(addrs := msg['from'].addresses) == 1:
- return addrs[0].addr_spec
- else:
- return msg['sender'].address.addr_spec
-
-
-def _sort_mails(f, sort):
-
- reverse = False
- if sort.startswith('!'):
- reverse = True
- sort = sort[1:]
-
- by_rec_date = lambda midmsg: float(re.match(r"\d+\.\d+", midmsg[0], re.ASCII)[0])
-
- if sort == 'date': keyfn = by_rec_date
- elif sort == 'sender': keyfn = _sort_by_sender
- elif sort == 'subject': keyfn = lambda midmsg: midmsg[1]['subject']
- elif sort == 'size': keyfn = lambda midmsg: path.getsize(f.get_filename(midmsg[0]))
- elif sort == '': keyfn = by_rec_date
- else:
- logging.warning("unknown sort-verb %r", sort)
- reverse = False
- keyfn = by_rec_date
-
- return keyfn, reverse
-
-
-def _get_mime_head_info(msg):
- return {
- 'content_maintype': msg.get_content_maintype(),
- 'content_subtype': msg.get_content_subtype(),
- 'content_disposition': msg.get_content_disposition(),
- 'filename': msg.get_filename(),
- }
-
-
-def _get_head_info(msg):
- return {
- 'date': msg['date'].datetime.isoformat(),
-
- 'from': _adr(msg['from']),
- 'sender': _adr(msg['sender']),
- 'reply_to': _adr(msg['reply-to']),
-
- 'to': _adr(msg['to']),
- 'cc': _adr(msg['cc']),
- 'bcc': _adr(msg['bcc']),
-
- 'subject': msg['subject'],
- 'comments': msg['comments'],
- 'keywords': msg['keywords'],
-
- 'mime': _get_mime_head_info(msg),
- }
-
-
-def list_mails(f, start, end, sortby, folder):
-
- assert 0 <= start <= end
-
- if folder:
- f = f.get_folder(folder)
-
- if start == end:
- return []
-
- kfn, reverse = _sort_mails(f, sortby)
- msgs = list(f.items())
- msgs.sort(key=kfn, reverse=reverse)
- msgs = msgs[start : min(len(msgs), end)]
-
- return [
- {
- 'message_handle': mid,
- 'byte_size': path.getsize(f.get_filename(mid)),
- 'unread': 'S' in msg.get_flags(),
- 'date_received': datetime.fromtimestamp(_get_rcv_time(mid)).isoformat(),
- 'head': _get_head_info(msg),
- }
- for mid, msg in msgs
- ]
-
-
-def count_mails(f, subfolder):
- if subfolder:
- f = f.get_folder(subfolder)
-
- return {
- 'total_mails': len(f),
- 'byte_size': sum(path.getsize(f.get_filename(mid)) for mid in f.keys()),
- 'unread_mails': len([1 for m in f if 'S' in m.get_flags()]),
- }
-
-
-def _get_body(mail):
- if not mail.is_multipart():
- if mail.get_content_maintype() == 'text':
- return mail.get_content()
- else:
- ret = mail.get_content()
- if ret.isascii():
- return ret.decode(encoding='ascii')
- elif len(ret) <= 128*1024:
- return b64encode(ret).decode(encoding='ascii')
- else:
- raise QMAuthError("non attachment part too large (>512kB)", size=len(ret))
-
- if (mctype := mail.get_content_maintype()) == 'message':
- msg = mail.get_content()
- return {
- 'head': _get_head_info(msg),
- 'body': _get_body(msg),
- }
- elif mctype == 'multipart':
- ret = {
- 'preamble': mail.preamble,
- 'parts': [],
- 'epilogue': mail.epilogue,
- }
- for part in mail.iter_parts():
- head = _get_mime_head_info(part)
- if head['content_disposition'] != 'attachment':
- body = _get_body(part)
- else:
- body = None
- ret['parts'].append({
- 'head': head,
- 'body': body,
- })
- return ret
- else:
- raise ValueError(f"unknown major content-type {mctype!r}")
-
-
-def read_mail(f, subfolder, mid):
- if subfolder:
- f = f.get_folder(subfolder)
-
- msg = f.get(mid, None)
- if not msg:
- raise QMAuthError("no such message", mid=mid)
-
- return {
- 'head': _get_head_info(msg),
- 'body': _get_body(msg),
- }
-
-
-def _descent(xx):
- head = _get_mime_head_info(xx)
- if (mctype := head['content_maintype']) == 'message':
- body = xx.get_content()
- elif mctype == 'multipart':
- body = xx.iter_parts()
- else:
- body = xx.get_content()
- return {
- 'head': head,
- 'body': body,
- }
-
-
-def raw_mail(f, subfolder, mid, path):
- if subfolder:
- f = f.get_folder(subfolder)
-
- msg = f.get(mid, None)
- if not msg:
- raise QMAuthError("no such message", mid=mid)
-
- pth = [int(seg) for seg in path.split('.')] if path else []
- mail = {
- 'head': {"content_maintype": "message", "content_subtype": "rfc822"},
- 'body': msg,
- }
-
- for n in pth:
- mctype = mail['head']['content_maintype']
-
- if mctype == 'multipart':
- try:
- res = next(islice(mail['body'], n, None))
- except StopIteration:
- raise QMAuthError("out of bounds path for mail", path=pth)
- mail = _descent(res)
- elif mctype == 'message':
- assert n == 0
- mail = _descent(mail['body'])
- else:
- raise QMAuthError(f"can not descent into non multipart content type {mctype}")
-
- if hasattr(mail['body'], '__next__'):
- raise QMAuthError("can not stop at multipart section", path=pth)
-
- json.dump(mail['head'], stdout)
- stdout.write("\n")
- if type(mail['body']) is str:
- stdout.write(mail['body'])
- elif type(mail['body']) is bytes:
- stdout.flush()
- stdout.buffer.write(mail['body'])
- else:
- stdout.write(str(mail['body']))
- sysexit(0)
-
-
-def _matches(m, pattern):
- if m.is_multipart():
- return any(
- 1
- for part in m.body.parts
- if re.search(pattern, part.decoded()) or re.search(pattern, part.subject)
- )
- return re.search(pattern, m.body.decoded()) or re.search(pattern, m.subject)
-
-
-def search_mails(f, pattern: str, subfolder: str):
- if subfolder:
- f = f.get_folder(subfolder)
-
- return [
- {
- 'head': _get_head_info(msg),
- 'body': _get_body(msg),
- }
- for msg in f.values()
- if _matches(msg, pattern)
- ]
-
-
-def folders(f):
- return f.list_folders()
-
-
-def move_mail(f, mid, from_, to):
- if from_:
- f = f.get_folder(from_)
-
- fname = Path(f.get_filename(mid))
-
- assert to in f.list_folders()
-
- sep = -2 if not from_ else -3
-
- if to:
- res = fname.parts[:sep] + ('.' + to,) + fname.parts[-2:]
- else:
- res = fname.parts[:sep] + fname.parts[-2:]
-
- fname.rename(Path(*res))
-
- return 1
-
-
-def parse_arguments():
- ap = ArgumentParser(allow_abbrev=False)
- ap.add_argument('maildir_path', type=Path)
- ap.add_argument('os_user')
- ap.add_argument('mail_user')
-
- sp = ap.add_subparsers(title='methods', required=True)
-
- sp_list = sp.add_parser('list')
- sp_list.add_argument('folder', metavar='subfolder')
- sp_list.add_argument('start', type=int)
- sp_list.add_argument('end', type=int)
- sp_list.add_argument('sortby', metavar='sort_by')
- sp_list.set_defaults(run=list_mails)
-
- sp_count = sp.add_parser('count')
- sp_count.add_argument('subfolder')
- sp_count.set_defaults(run=count_mails)
-
- sp_read = sp.add_parser('read')
- sp_read.add_argument('subfolder')
- sp_read.add_argument('mid', metavar='message')
- sp_read.set_defaults(run=read_mail)
-
- sp_raw = sp.add_parser('raw')
- sp_raw.add_argument('subfolder')
- sp_raw.add_argument('mid', metavar='message')
- sp_raw.add_argument('path', default='')
- sp_raw.set_defaults(run=raw_mail)
-
- sp_folders = sp.add_parser('folders')
- sp_folders.set_defaults(run=folders)
-
- sp_move = sp.add_parser('move')
- sp_move.add_argument('mid', metavar='message')
- sp_move.add_argument('from_', metavar='from')
- sp_move.add_argument('to')
- sp_move.set_defaults(run=move_mail)
-
- sp_search = sp.add_parser('search')
- sp_search.add_argument('pattern')
- sp_search.add_argument('subfolder')
- sp_search.set_defaults(run=search_mails)
-
- return vars(ap.parse_args())
-
-
-def main():
- try:
- logging.basicConfig(
- level='INFO',
- format="%(levelname)s:"+str(getpid())+":%(message)s",
- )
- args = parse_arguments()
- logging.debug("started with %s", args)
- s = startup(
- args.pop('maildir_path'),
- args.pop('os_user'),
- args.pop('mail_user'),
- args['run'],
- )
- logging.debug("setuid successful")
- run = args.pop('run')
- reply = run(s, **args)
- json.dump(reply, stdout)
- except QMAuthError as qerr:
- errmsg = dict(error=qerr.msg, **qerr.info)
- json.dump(errmsg, stdout)
- sysexit(3)
- except Exception:
- logging.exception("qmauth.py error")
- sysexit(4)
-
-
-if __name__ == '__main__': main()