#!/usr/bin/env python3 """qmauth.py Extract delivers information about emails from a maildir. Runs with elevated privileges. This program is started by qmail-authuser with elevated privileges after a successful login. Input directives are provided as command line arguments. Output is delivered via STDOUT as json and log information via STDERR. Exit codes:: 1 reserved 2 reserved 3 operational error (error message in output) 4 user error (no output) 5 issue switching to user (no output) 110 reserved 111 reserved """ import json import logging import re from argparse import ArgumentParser from datetime import datetime from email import message_from_binary_file, policy from functools import cache from glob import glob from mailbox import Maildir, MaildirMessage from os import environ, getpid, path, setuid, stat from pathlib import Path from pwd import getpwnam from sys import exit as sysexit, stdout class MyMaildir(Maildir): def __init__(self, dirname, *args, **kwargs): self.__path = dirname super().__init__(dirname, *args, **kwargs) def get_filename(self, mid): if mid not in self: raise KeyError(mid) p_cur = glob(path.join(self.__path, 'cur', mid + '*')) p_new = glob(path.join(self.__path, 'new', mid + '*')) res = p_cur + p_new if len(res) != 1: raise LookupError("could not uniquely identify file for mail-id") return res[0] def get_folder(self, folder): # copy from internal implementation return MyMaildir( path.join(self._path, '.' + folder), factory=self._factory, create=False, ) @cache def _file_size(fname): return stat(fname).st_size def _adr(addrs): if addrs is None: return None return [ {'address': addr.addr_spec, 'display_name': addr.display_name} for addr in addrs.addresses ] def _get_rcv_time(mid): idx = mid.find('.') assert idx >= 0 return float(mid[:idx]) def startup(maildir, su, user): del environ['PATH'] netfehcom_uid = getpwnam(su).pw_uid assert netfehcom_uid, "must be non root" try: setuid(netfehcom_uid) except OSError: logging.exception("error setting uid") sysexit(5) return MyMaildir( maildir / user, create=False, factory=lambda x: MaildirMessage( message_from_binary_file(x, policy=policy.default) ), ) def _sort_by_sender(midmsg): _, msg = midmsg if len(addrs := msg['from'].addresses) == 1: return addrs[0].addr_spec else: return msg['sender'].address.addr_spec def _sort_mails(f, sort): reverse = False if sort.startswith('!'): reverse = True sort = sort[1:] by_rec_date = lambda midmsg: float(re.match(r"\d+\.\d+", midmsg[0], re.ASCII)[0]) if sort == 'date': keyfn = by_rec_date elif sort == 'sender': keyfn = _sort_by_sender elif sort == 'subject': keyfn = lambda midmsg: midmsg[1]['subject'] elif sort == 'size': keyfn = lambda midmsg: _file_size(f.get_filename(midmsg[0])) elif sort == '': keyfn = by_rec_date else: logging.warning("unknown sort-verb %r", sort) reverse = False keyfn = by_rec_date return keyfn, reverse def _get_mime_head_info(msg): return { 'content_maintype': msg.get_content_maintype(), 'content_subtype': msg.get_content_subtype(), 'content_disposition': msg.get_content_disposition(), 'filename': msg.get_filename(), } def _get_head_info(msg): return { 'date': msg['date'].datetime.isoformat(), 'from': _adr(msg['from']), 'sender': _adr(msg['sender']), 'reply_to': _adr(msg['reply-to']), 'to': _adr(msg['to']), 'cc': _adr(msg['cc']), 'bcc': _adr(msg['bcc']), 'subject': msg['subject'], 'comments': msg['comments'], 'keywords': msg['keywords'], 'mime': _get_mime_head_info(msg), } def list_mails(f, start, end, sortby, folder): assert 0 <= start <= end if folder: f = f.get_folder(folder) if start == end: return [] kfn, reverse = _sort_mails(f, sortby) msgs = list(f.items()) msgs.sort(key=kfn, reverse=reverse) msgs = msgs[start : min(len(msgs), end)] return [ { 'message_handle': mid, 'byte_size': _file_size(f.get_filename(mid)), 'unread': 'S' in msg.get_flags(), 'date_received': datetime.fromtimestamp(_get_rcv_time(mid)).isoformat(), 'head': _get_head_info(msg), } for mid, msg in msgs ] def count_mails(f, subfolder): if subfolder: f = f.get_folder(subfolder) return { 'total_mails': len(f), 'byte_size': sum(_file_size(f.get_filename(mid)) for mid in f.keys()), 'unread_mails': len([1 for m in f if 'S' in m.get_flags()]), } def _get_body(mail): if not mail.is_multipart(): if mail.get_content_maintype() == 'text': return mail.get_payload(decode=True).decode(errors='replace') else: return mail.get_payload() if (mctype := mail.get_content_maintype()) == 'message': mm = mail.get_payload() assert len(mm) == 1 msg = mm[0] return { 'head': _get_head_info(msg), 'body': msg.get_content(), } elif mctype == 'multipart': return { 'preamble': mail.preamble, 'parts': [ { 'head': _get_mime_head_info(part), 'body': _get_body(part), } for part in mail.get_payload() ], 'epilogue': mail.epilogue, } else: raise ValueError(f"unknown major content-type {mctype!r}") def read_mail(f, subfolder, mid): if subfolder: f = f.get_folder(subfolder) msg = f[mid] if not msg: return {'error': "no such message", 'mid': mid} return { 'head': _get_head_info(msg), 'body': _get_body(msg), } def _matches(m, pattern): if m.is_multipart(): return any( 1 for part in m.body.parts if re.search(pattern, part.decoded()) or re.search(pattern, part.subject) ) return re.search(pattern, m.body.decoded()) or re.search(pattern, m.subject) def search_mails(f, pattern: str, subfolder: str): if subfolder: f = f.get_folder(subfolder) return [ { 'head': _get_head_info(msg), 'body': _get_body(msg), } for msg in f.values() if _matches(msg, pattern) ] def folders(f): return f.list_folders() def move_mail(f, mid, from_, to): if from_: f = f.get_folder(from_) fname = Path(f.get_filename(mid)) assert to in f.list_folders() sep = -2 if not from_ else -3 if to: res = fname.parts[:sep] + ('.' + to,) + fname.parts[-2:] else: res = fname.parts[:sep] + fname.parts[-2:] fname.rename(Path(*res)) return 1 ap = ArgumentParser(allow_abbrev=False) ap.add_argument('maildir_path', type=Path) ap.add_argument('os_user') ap.add_argument('mail_user') sp = ap.add_subparsers(title='methods', required=True) sp_list = sp.add_parser('list') sp_list.add_argument('folder', metavar='subfolder') sp_list.add_argument('start', type=int) sp_list.add_argument('end', type=int) sp_list.add_argument('sortby', metavar='sort_by') sp_list.set_defaults(run=list_mails) sp_count = sp.add_parser('count') sp_count.add_argument('subfolder') sp_count.set_defaults(run=count_mails) sp_read = sp.add_parser('read') sp_read.add_argument('subfolder') sp_read.add_argument('mid', metavar='message') sp_read.set_defaults(run=read_mail) sp_folders = sp.add_parser('folders') sp_folders.set_defaults(run=folders) sp_move = sp.add_parser('move') sp_move.add_argument('mid', metavar='message') sp_move.add_argument('from_', metavar='from') sp_move.add_argument('to') sp_move.set_defaults(run=move_mail) sp_search = sp.add_parser('search') sp_search.add_argument('pattern') sp_search.add_argument('subfolder') sp_search.set_defaults(run=search_mails) if __name__ == '__main__': try: logging.basicConfig( level='INFO', format="%(levelname)s:"+str(getpid())+":%(message)s", ) args = vars(ap.parse_args()) logging.debug("started with %s", args) s = startup(args.pop('maildir_path'), args.pop('os_user'), args.pop('mail_user')) logging.debug("setuid successful") run = args.pop('run') reply = run(s, **args) json.dump(reply, stdout) if isinstance(reply, dict) and 'error' in reply: sysexit(3) except Exception: logging.exception("qmauth.py error") sysexit(4)