diff options
Diffstat (limited to 'script/qmauth.py')
-rwxr-xr-x | script/qmauth.py | 349 |
1 files changed, 349 insertions, 0 deletions
diff --git a/script/qmauth.py b/script/qmauth.py new file mode 100755 index 0000000..7803483 --- /dev/null +++ b/script/qmauth.py @@ -0,0 +1,349 @@ +#!/usr/bin/env python3 + +"""qmauth.py + +Extract delivers information about emails from a maildir. +Runs with elevated privileges. + +This program is started by qmail-authuser with elevated privileges after +a successful login. +Input directives are provided as command line arguments. +Output is delivered via STDOUT as json and log information via STDERR. + +Exit codes:: + + 1 reserved + 2 reserved + 3 operational error (error message in output) + 4 user error (no output) + 5 issue switching to user (no output) + 110 reserved + 111 reserved +""" + +import json +import logging +import re + +from argparse import ArgumentParser +from datetime import datetime +from email import message_from_binary_file, policy +from functools import cache +from glob import glob +from mailbox import Maildir, MaildirMessage +from os import environ, getpid, path, setuid, stat +from pathlib import Path +from pwd import getpwnam +from sys import exit as sysexit, stdout + + +class MyMaildir(Maildir): + + def __init__(self, dirname, *args, **kwargs): + self.__path = dirname + super().__init__(dirname, *args, **kwargs) + + def get_filename(self, mid): + if mid not in self: + raise KeyError(mid) + p_cur = glob(path.join(self.__path, 'cur', mid + '*')) + p_new = glob(path.join(self.__path, 'new', mid + '*')) + res = p_cur + p_new + if len(res) != 1: + raise LookupError("could not uniquely identify file for mail-id") + return res[0] + + def get_folder(self, folder): + # copy from internal implementation + return MyMaildir( + path.join(self._path, '.' + folder), factory=self._factory, create=False, + ) + + +@cache +def _file_size(fname): + return stat(fname).st_size + + +def _adr(addrs): + if addrs is None: + return None + return [ + {'address': addr.addr_spec, 'display_name': addr.display_name} + for addr in addrs.addresses + ] + + +def _get_rcv_time(mid): + idx = mid.find('.') + assert idx >= 0 + return float(mid[:idx]) + + +def startup(maildir, su, user): + + del environ['PATH'] + + netfehcom_uid = getpwnam(su).pw_uid + assert netfehcom_uid, "must be non root" + try: + setuid(netfehcom_uid) + except OSError: + logging.exception("error setting uid") + sysexit(5) + + return MyMaildir( + maildir / user, + create=False, + factory=lambda x: MaildirMessage( + message_from_binary_file(x, policy=policy.default) + ), + ) + + +def _sort_by_sender(midmsg): + _, msg = midmsg + + if len(addrs := msg['from'].addresses) == 1: + return addrs[0].addr_spec + else: + return msg['sender'].address.addr_spec + + +def _sort_mails(f, sort): + + reverse = False + if sort.startswith('!'): + reverse = True + sort = sort[1:] + + by_rec_date = lambda midmsg: float(re.match(r"\d+\.\d+", midmsg[0], re.ASCII)[0]) + + if sort == 'date': keyfn = by_rec_date + elif sort == 'sender': keyfn = _sort_by_sender + elif sort == 'subject': keyfn = lambda midmsg: midmsg[1]['subject'] + elif sort == 'size': keyfn = lambda midmsg: _file_size(f.get_filename(midmsg[0])) + elif sort == '': keyfn = by_rec_date + else: + logging.warning("unknown sort-verb %r", sort) + reverse = False + keyfn = by_rec_date + + return keyfn, reverse + + +def _get_mime_head_info(msg): + return { + 'content_maintype': msg.get_content_maintype(), + 'content_subtype': msg.get_content_subtype(), + 'content_disposition': msg.get_content_disposition(), + 'filename': msg.get_filename(), + } + + +def _get_head_info(msg): + return { + 'date': msg['date'].datetime.isoformat(), + + 'from': _adr(msg['from']), + 'sender': _adr(msg['sender']), + 'reply_to': _adr(msg['reply-to']), + + 'to': _adr(msg['to']), + 'cc': _adr(msg['cc']), + 'bcc': _adr(msg['bcc']), + + 'subject': msg['subject'], + 'comments': msg['comments'], + 'keywords': msg['keywords'], + + 'mime': _get_mime_head_info(msg), + } + + +def list_mails(f, start, end, sortby, folder): + + assert 0 <= start <= end + + if folder: + f = f.get_folder(folder) + + if start == end: + return [] + + kfn, reverse = _sort_mails(f, sortby) + msgs = list(f.items()) + msgs.sort(key=kfn, reverse=reverse) + msgs = msgs[start : min(len(msgs), end)] + + return [ + { + 'message_handle': mid, + 'byte_size': _file_size(f.get_filename(mid)), + 'unread': 'S' in msg.get_flags(), + 'date_received': datetime.fromtimestamp(_get_rcv_time(mid)).isoformat(), + 'head': _get_head_info(msg), + } + for mid, msg in msgs + ] + + +def count_mails(f, subfolder): + if subfolder: + f = f.get_folder(subfolder) + + return { + 'total_mails': len(f), + 'byte_size': sum(_file_size(f.get_filename(mid)) for mid in f.keys()), + 'unread_mails': len([1 for m in f if 'S' in m.get_flags()]), + } + + +def _get_body(mail): + if not mail.is_multipart(): + if mail.get_content_maintype() == 'text': + return mail.get_payload(decode=True).decode() + else: + return mail.get_payload() + + if (mctype := mail.get_content_maintype()) == 'message': + mm = mail.get_payload() + assert len(mm) == 1 + msg = mm[0] + return { + 'head': _get_head_info(msg), + 'body': msg.get_content(), + } + elif mctype == 'multipart': + return { + 'preamble': mail.preamble, + 'parts': [ + { + 'head': _get_mime_head_info(part), + 'body': _get_body(part), + } + for part in mail.get_payload() + ], + 'epilogue': mail.epilogue, + } + else: + raise ValueError(f"unknown major content-type {mctype!r}") + + +def read_mail(f, subfolder, mid): + if subfolder: + f = f.get_folder(subfolder) + + msg = f[mid] + if not msg: + return {'error': "no such message", 'mid': mid} + + return { + 'head': _get_head_info(msg), + 'body': _get_body(msg), + } + + +def _matches(m, pattern): + if m.is_multipart(): + return any( + 1 + for part in m.body.parts + if re.search(pattern, part.decoded()) or re.search(pattern, part.subject) + ) + return re.search(pattern, m.body.decoded()) or re.search(pattern, m.subject) + + +def search_mails(f, pattern: str, subfolder: str): + if subfolder: + f = f.get_folder(subfolder) + + return [ + { + 'head': _get_head_info(msg), + 'body': _get_body(msg), + } + for msg in f.values() + if _matches(msg, pattern) + ] + + +def folders(f): + return f.list_folders() + + +def move_mail(f, mid, from_, to): + if from_: + f = f.get_folder(from_) + + fname = Path(f.get_filename(mid)) + + assert to in f.list_folders() + + sep = -2 if not from_ else -3 + + if to: + res = fname.parts[:sep] + ('.' + to,) + fname.parts[-2:] + else: + res = fname.parts[:sep] + fname.parts[-2:] + + fname.rename(Path(*res)) + + return 1 + + +ap = ArgumentParser(allow_abbrev=False) +ap.add_argument('maildir_path', type=Path) +ap.add_argument('os_user') +ap.add_argument('mail_user') +sp = ap.add_subparsers(title='methods', required=True) + +sp_list = sp.add_parser('list') +sp_list.add_argument('folder', metavar='subfolder') +sp_list.add_argument('start', type=int) +sp_list.add_argument('end', type=int) +sp_list.add_argument('sortby', metavar='sort_by') +sp_list.set_defaults(run=list_mails) + +sp_count = sp.add_parser('count') +sp_count.add_argument('subfolder') +sp_count.set_defaults(run=count_mails) + +sp_read = sp.add_parser('read') +sp_read.add_argument('subfolder') +sp_read.add_argument('mid', metavar='message') +sp_read.set_defaults(run=read_mail) + +sp_folders = sp.add_parser('folders') +sp_folders.set_defaults(run=folders) + +sp_move = sp.add_parser('move') +sp_move.add_argument('mid', metavar='message') +sp_move.add_argument('from_', metavar='from') +sp_move.add_argument('to') +sp_move.set_defaults(run=move_mail) + +sp_search = sp.add_parser('search') +sp_search.add_argument('pattern') +sp_search.add_argument('subfolder') +sp_search.set_defaults(run=search_mails) + + +if __name__ == '__main__': + try: + logging.basicConfig( + level='INFO', + format="%(levelname)s:"+str(getpid())+":%(message)s", + ) + args = vars(ap.parse_args()) + logging.debug("started with %s", args) + s = startup(args.pop('maildir_path'), args.pop('os_user'), args.pop('mail_user')) + logging.debug("setuid successful") + run = args.pop('run') + reply = run(s, **args) + json.dump(reply, stdout) + if isinstance(reply, dict) and 'error' in reply: + sysexit(3) + except Exception: + logging.exception("qmauth.py error") + sysexit(4) |