summaryrefslogtreecommitdiff
path: root/script/extract.py
diff options
context:
space:
mode:
Diffstat (limited to 'script/extract.py')
-rwxr-xr-xscript/extract.py805
1 files changed, 406 insertions, 399 deletions
diff --git a/script/extract.py b/script/extract.py
index c670bef..f311fba 100755
--- a/script/extract.py
+++ b/script/extract.py
@@ -5,47 +5,31 @@
Extract delivers information about emails from a maildir.
Runs with elevated privileges.
-This program is started by qmail-authuser with elevated privileges after
-a successful login.
-
-The run method is provided by a command line argument.
-Additional data is read from STDIN as protobuf.
-Output is delivered via STDOUT as protobuf and log information via STDERR.
-
-Exit codes::
-
- 1 reserved
- 2 reserved
- 3 operational error (error message in output)
- 4 user error (no output)
- 5 issue switching to user (no output)
- 110 reserved
- 111 reserved
+This program is started by qmail-authuser with elevated privileges after a
+successful login.
"""
import email.parser
import email.policy
import logging
import re
-from argparse import ArgumentParser
+import socket
from base64 import b64encode
from datetime import datetime
from email.message import EmailMessage
from itertools import islice
-from mailbox import Maildir, MaildirMessage
-from os import environ, getpid, mkdir, path, setuid
+from mailbox import Maildir, MaildirMessage, NoSuchMailboxError
+from os import environ, getpid, mkdir, path, setuid, stat
from pathlib import Path
from pwd import getpwnam
from sys import exit as sysexit
-from sys import stdin, stdout
-import jwebmail.model.jwebmail_pb2 as jwebmail
+import varlink
class MyMaildir(Maildir):
def __init__(self, dirname, parent=None, *args, **kwargs):
self.__path = Path(dirname)
- self.__parent = parent
self.set_msgtype("MaildirMessage")
super().__init__(dirname, *args, **kwargs)
@@ -96,16 +80,9 @@ class MyMaildir(Maildir):
"""
return type(self)(
path.join(self._path, "." + folder),
- parent=self,
create=False,
)
- def list_folders(self):
- if self.__parent is not None:
- return self.__parent.list_folders()
- else:
- return super().list_folders()
-
def _refresh(self):
"""
This override of internal method _refresh that strips out 'hidden' files.
@@ -116,427 +93,462 @@ class MyMaildir(Maildir):
del self._toc[r]
-class QMAuthError(Exception):
- def __init__(self, msg, **args):
- self.msg = msg
- self.info = args
+service = varlink.Service(
+ vendor="JWebmail",
+ product="Mail Storage Interface",
+ version="1",
+ url="https://www.fehcom.de/cgit/jwebmail2",
+ interface_dir=path.join(path.dirname(__file__), "../src/jwebmail/model/"),
+)
-def _adr(addrs):
- if addrs is None:
- return []
- return [
- jwebmail.MailHeader.MailAddr(
- address=addr.addr_spec,
- name=addr.display_name,
+class InvalidUserError(varlink.VarlinkError):
+
+ def __init__(self, unix_user):
+ super().__init__(
+ {
+ "error": "de.jmhoffmann.jwebmail.mail-storage.InvalidUser",
+ "parameters": {
+ "unix_user": unix_user,
+ },
+ }
)
- for addr in addrs.addresses
- ]
-def _get_rcv_time(mid):
- idx = mid.find(".")
- assert idx > 0
- return float(mid[:idx])
+class NotInitializedError(varlink.VarlinkError):
+ def __init__(self):
+ super().__init__(
+ {"error": "de.jmhoffmann.jwebmail.mail-storage.NotInitialized"}
+ )
-def startup(maildir, su, user, mode):
- del environ["PATH"]
- netfehcom_uid = getpwnam(su).pw_uid
- if not netfehcom_uid:
- logging.error("user must not be root")
- sysexit(5)
- try:
- setuid(netfehcom_uid)
- except OSError:
- logging.exception("error setting uid")
- sysexit(5)
-
- return MyMaildir(maildir / user, create=False)
-
-
-def _sort_by_sender(midmsg):
- _, msg = midmsg
-
- if len(addrs := msg["from"].addresses) == 1:
- return addrs[0].addr_spec
- else:
- return msg["sender"].address.addr_spec
-
-
-def _sort_mails(f, sort):
- reverse = False
- if sort.startswith("!"):
- reverse = True
- sort = sort[1:]
-
- def by_rec_date(midmsg):
- return float(re.match(r"\d+\.\d+", midmsg[0], re.ASCII)[0])
-
- if sort == "date":
- keyfn = by_rec_date
- elif sort == "sender":
- keyfn = _sort_by_sender
- elif sort == "subject":
- # fmt: off
- def keyfn(midmsg): return midmsg[1]["subject"]
- # fmt: on
- elif sort == "size":
- # fmt: off
- def keyfn(midmsg): return path.getsize(f.get_filename(midmsg[0]))
- # fmt: on
- elif sort == "":
- keyfn = by_rec_date
- else:
- logging.warning("unknown sort-verb %r", sort)
- reverse = False
- keyfn = by_rec_date
-
- return keyfn, reverse
-
-
-def _get_mime_head_info(msg):
- mh = jwebmail.MIMEHeader(
- maintype=msg.get_content_maintype(),
- subtype=msg.get_content_subtype(),
- )
- if (cd := msg.get_content_disposition()) == "inline":
- mh.contentdispo = (
- jwebmail.MIMEHeader.ContentDisposition.CONTENT_DISPOSITION_INLINE
- )
- elif cd == "attachment":
- mh.contentdispo = (
- jwebmail.MIMEHeader.ContentDisposition.CONTENT_DISPOSITION_ATTACHMENT
- )
- elif cd is None:
- mh.contentdispo = (
- jwebmail.MIMEHeader.ContentDisposition.CONTENT_DISPOSITION_NONE
+class InvalidMailboxError(varlink.VarlinkError):
+
+ def __init__(self, path, not_a_mailbox, user_mismatch):
+ super().__init__(
+ {
+ "error": "de.jmhoffmann.jwebmail.mail-storage.InvalidMailbox",
+ "parameters": {
+ "path": path,
+ "not_a_mailbox": not_a_mailbox,
+ "user_mismatch": user_mismatch,
+ },
+ }
)
- else:
- assert False
- if fn := msg.get_filename():
- mh.file_name = fn
- return mh
+class InvalidMIDError(varlink.VarlinkError):
+ def __init__(self, folder, mid):
+ super().__init__(
+ {
+ "error": "de.jmhoffmann.jwebmail.mail-storage.InvalidMID",
+ "parameters": {
+ "folder": folder,
+ "mid": mid,
+ },
+ }
+ )
-def _get_head_info(msg):
- mh = jwebmail.MailHeader(
- send_date=msg["date"].datetime.isoformat(),
- written_from=_adr(msg["from"]),
- reply_to=_adr(msg["reply-to"]),
- send_to=_adr(msg["to"]),
- cc=_adr(msg["cc"]),
- bcc=_adr(msg["bcc"]),
- subject=msg["subject"],
- comments=msg["comments"],
- keywords=msg["keywords"],
- mime=_get_mime_head_info(msg),
- )
- if s := _adr(msg["sender"]):
- mh.sender = s[0]
+class InvalidPathInMailError(varlink.VarlinkError):
- return mh
+ def __init__(self, folder, mid, path):
+ super().__init__(
+ {
+ "error": "de.jmhoffmann.jwebmail.mail-storage.InvalidPathInMail",
+ "parameters": {
+ "folder": folder,
+ "mid": mid,
+ "path": path,
+ },
+ }
+ )
-def list_mails(f, req):
- r = jwebmail.ListReq()
- r.ParseFromString(req)
+@service.interface("de.jmhoffmann.jwebmail.mail-storage")
+class MailStorage:
- assert 0 <= r.start <= r.end
+ def __init__(self):
+ self.maildir = None
- if r.folder:
- f = f.get_folder(r.folder)
+ def Init(self, su, maildir):
+ del environ["PATH"]
- f.set_msgtype("Maildir(EmailMessageHeader)")
+ try:
+ uid = getpwnam(su).pw_uid
+ if not uid:
+ raise InvalidUserError(su)
- if r.start == r.end:
- return []
+ setuid(uid)
- kfn, reverse = _sort_mails(f, r.sort)
- msgs = list(f.items())
- msgs.sort(key=kfn, reverse=reverse)
- msgs = msgs[r.start : min(len(msgs), r.end)]
+ if stat(maildir).st_uid != uid:
+ raise InvalidMailboxError(
+ maildir, not_a_mailbox=False, user_mismatch=True
+ )
- items = [
- jwebmail.ListMailHeader(
- mid=mid,
- byte_size=path.getsize(f.get_filename(mid)),
- unread="S" not in msg.get_flags(),
- rec_date=datetime.fromtimestamp(_get_rcv_time(mid)).isoformat(),
- header=_get_head_info(msg),
+ self.maildir = MyMaildir(maildir, create=False)
+ except KeyError:
+ raise InvalidUserError(su)
+ except (FileNotFoundError, NoSuchMailboxError):
+ raise InvalidMailboxError(maildir, not_a_mailbox=True, user_mismatch=False)
+
+ def List(self, folder, start, end, sort):
+ if self.maildir is None:
+ raise NotInitializedError()
+
+ assert 0 <= start <= end
+
+ maildir = self.maildir
+ if folder:
+ maildir = maildir.get_folder(folder)
+
+ maildir.set_msgtype("Maildir(EmailMessageHeader)")
+
+ if start == end:
+ return []
+
+ kfn, reverse = self._sort_mails(maildir, sort)
+ msgs = list(maildir.items())
+ msgs.sort(key=kfn, reverse=reverse)
+ msgs = msgs[start : min(len(msgs), end)]
+
+ def _get_rcv_time(mid):
+ return float(mid[: mid.index(".")])
+
+ items = [
+ {
+ "mid": mid,
+ "byte_size": path.getsize(maildir.get_filename(mid)),
+ "unread": "S" not in msg.get_flags(),
+ "rec_date": datetime.fromtimestamp(_get_rcv_time(mid)).isoformat(),
+ "header": self._get_head_info(msg),
+ }
+ for mid, msg in msgs
+ ]
+ return {"mail_heads": items}
+
+ def Stats(self, folder):
+ if self.maildir is None:
+ raise NotInitializedError()
+
+ maildir = self.maildir
+ if folder:
+ maildir = maildir.get_folder(folder)
+
+ maildir.set_msgtype("None")
+
+ return dict(
+ mail_count=len(maildir),
+ unread_count=len([1 for m in maildir if "S" in m.get_flags()]),
+ byte_size=sum(
+ path.getsize(maildir.get_filename(mid)) for mid in maildir.keys()
+ ),
)
- for mid, msg in msgs
- ]
- return jwebmail.ListResp(mail_heads=items).SerializeToString()
+ def Show(self, folder, mid):
+ if self.maildir is None:
+ raise NotInitializedError()
-def count_mails(f, req):
- r = jwebmail.StatsReq()
- r.ParseFromString(req)
- if r.folder:
- f = f.get_folder(r.folder)
+ maildir = self.maildir
+ if folder:
+ maildir = maildir.get_folder(folder)
- f.set_msgtype("None")
+ maildir.set_msgtype("EmailMessage")
- resp = jwebmail.StatsResp(
- mail_count=len(f),
- unread_count=len([1 for m in f if "S" in m.get_flags()]),
- byte_size=sum(path.getsize(f.get_filename(mid)) for mid in f.keys()),
- )
- return resp.SerializeToString()
+ msg = maildir.get(mid, None)
+ if not msg:
+ raise InvalidMIDError(folder, mid)
+ maildir.set_msgtype("MaildirMessage")
-def _get_body(mail):
- if not mail.is_multipart():
- if mail.get_content_maintype() == "text":
- return jwebmail.MailBody(discrete=mail.get_content())
- else:
- ret = mail.get_content()
- if ret.isascii():
- return jwebmail.MailBody(discrete=ret.decode(encoding="ascii"))
- elif len(ret) <= 128 * 1024:
- return jwebmail.MailBody(
- discrete=b64encode(ret).decode(encoding="ascii")
- )
- else:
- raise QMAuthError(
- "non attachment part too large (>512kB)", size=len(ret)
- )
+ maildir[mid].add_flag("S")
- if (mctype := mail.get_content_maintype()) == "message":
- msg = mail.get_content()
- return jwebmail.MailBody(
- mail=jwebmail.Mail(head=_get_head_info(msg), body=_get_body(msg))
- )
- elif mctype == "multipart":
- ret = jwebmail.MailBody.Multipart(
- preamble=mail.preamble,
- epilogue=mail.epilogue,
- )
- for part in mail.iter_parts():
- head = _get_mime_head_info(part)
- if (
- head.contentdispo
- != jwebmail.MIMEHeader.ContentDisposition.CONTENT_DISPOSITION_ATTACHMENT
- ):
- body = _get_body(part)
+ return {
+ "mail": dict(
+ head=self._get_head_info(msg),
+ body=self._get_body(msg),
+ )
+ }
+
+ def Raw(self, folder, mid, path):
+ if self.maildir is None:
+ raise NotInitializedError()
+
+ maildir = self.maildir
+ if folder:
+ maildir = maildir.get_folder(folder)
+
+ maildir.set_msgtype("EmailMessage")
+
+ msg = maildir.get(mid, None)
+ if not msg:
+ raise InvalidMIDError(folder, mid)
+
+ pth = [int(seg) for seg in path.split(".")] if path else []
+ h = dict(mime_type={"main_type": "message", "sub_type": "rfc822"})
+ b = msg
+
+ for n in pth:
+ mctype = h["mime_type"]["main_type"]
+
+ if mctype == "multipart":
+ try:
+ res = next(islice(b, n, None))
+ except StopIteration:
+ raise InvalidPathInMailError(folder, mid, pth)
+ (h, b) = self._descent(res)
+ elif mctype == "message":
+ assert n == 0
+ (h, b) = self._descent(b)
else:
- body = None
- ret.parts.append(
- jwebmail.MIMEPart(
- mime_header=head,
- body=body,
+ raise InvalidPathInMailError(
+ f"can not descent into non multipart content type {mctype}"
)
- )
- return jwebmail.MailBody(multipart=ret)
- else:
- raise ValueError(f"unknown major content-type {mctype!r}")
-
-
-def read_mail(f, req):
- r = jwebmail.ShowReq()
- r.ParseFromString(req)
- if r.folder:
- f = f.get_folder(r.folder)
+ if hasattr(b, "__next__"):
+ # can not stop at multipart section
+ raise InvalidPathInMailError(folder=folder, mid=mid, path=pth)
+ elif isinstance(b, str):
+ b = b.encode()
+ elif isinstance(b, EmailMessage):
+ b = b.as_bytes()
- f.set_msgtype("EmailMessage")
+ return dict(header=h, body=b64encode(b).decode("ASCII"))
- msg = f.get(r.mid, None)
- if not msg:
- raise QMAuthError("no such message", mid=r.mid)
+ def Search(self, folder, pattern):
+ if self.maildir is None:
+ raise NotInitializedError()
- f.set_msgtype("MaildirMessage")
+ maildir = self.maildir
+ if folder:
+ maildir = maildir.get_folder(folder)
- f[r.mid].add_flag("S")
+ maildir.set_msgtype("EmailMessage")
- res = jwebmail.Mail(
- head=_get_head_info(msg),
- body=_get_body(msg),
- )
- return jwebmail.ShowResp(mail=res).SerializeToString()
+ res = [
+ dict(header=self._get_head_info(msg))
+ for msg in maildir.values()
+ if self._matches(msg, pattern)
+ ]
+ return {"found": res}
+ def Folders(self):
+ if self.maildir is None:
+ raise NotInitializedError()
-def _descent(xx):
- head = _get_mime_head_info(xx)
- if (mctype := head.maintype) == "message":
- body = xx.get_content()
- elif mctype == "multipart":
- body = xx.iter_parts()
- else:
- body = xx.get_content()
- return head, body
+ return {"folders": self.maildir.list_folders()}
+ def Move(self, mid, from_folder, to_folder):
+ if self.maildir is None:
+ raise NotInitializedError()
-def raw_mail(f, req):
- r = jwebmail.RawReq()
- r.ParseFromString(req)
+ maildir = self.maildir
+ if from_folder:
+ maildir = maildir.get_folder(from_folder)
- if r.folder:
- f = f.get_folder(r.folder)
+ fname = Path(maildir.get_filename(mid))
- f.set_msgtype("EmailMessage")
+ assert to_folder in self.maildir.list_folders() or to_folder == ""
- msg = f.get(r.mid, None)
- if not msg:
- raise QMAuthError("no such message", mid=r.mid)
+ sep = -2 if not from_folder else -3
- pth = [int(seg) for seg in r.path.split(".")] if r.path else []
- h = jwebmail.MIMEHeader(maintype="message", subtype="rfc822")
- b = msg
-
- for n in pth:
- mctype = h.maintype
-
- if mctype == "multipart":
- try:
- res = next(islice(b, n, None))
- except StopIteration:
- raise QMAuthError("out of bounds path for mail", path=pth)
- (h, b) = _descent(res)
- elif mctype == "message":
- assert n == 0
- (h, b) = _descent(b)
+ if to_folder:
+ res = fname.parts[:sep] + ("." + to_folder,) + fname.parts[-2:]
else:
- raise QMAuthError(
- f"can not descent into non multipart content type {mctype}"
- )
+ res = fname.parts[:sep] + fname.parts[-2:]
- if hasattr(b, "__next__"):
- raise QMAuthError("can not stop at multipart section", path=pth)
- elif isinstance(b, str):
- b = b.encode()
- elif isinstance(b, EmailMessage):
- b = b.as_bytes()
+ fname.rename(Path(*res))
- return jwebmail.RawResp(header=h, body=b).SerializeToString()
+ def Remove(self, folder, mid):
+ if self.maildir is None:
+ raise NotInitializedError()
+ maildir = self.maildir
+ if folder:
+ maildir = maildir.get_folder(folder)
-def _matches(m, pattern):
- if m.is_multipart():
- return any(
- 1
- for part in m.body.parts
- if re.search(pattern, part.decoded()) or re.search(pattern, part.subject)
- )
- return re.search(pattern, m.body.decoded()) or re.search(pattern, m.subject)
+ maildir.set_msgtype("MaildirMessage")
+ maildir[mid].add_flag("T")
-def search_mails(f, req):
- r = jwebmail.SearchReq()
- r.ParseFromString(req)
+ def AddFolder(self, name):
+ if self.maildir is None:
+ raise NotInitializedError()
- if r.folder:
- f = f.get_folder(r.folder)
-
- f.set_msgtype("EmailMessage")
-
- res = [
- jwebmail.ListMailHeader(
- header=_get_head_info(msg),
+ name = path.join(
+ self.maildir._path, "." + name.translate(str.maketrans("/", "."))
)
- for msg in f.values()
- if _matches(msg, r.pattern)
- ]
- return jwebmail.SearchResp(found=res).SerializeToString()
-
-def folders(f, req):
- r = jwebmail.FoldersReq()
- r.ParseFromString(req)
- return jwebmail.FoldersResp(folders=f.list_folders()).SerializeToString()
-
-
-def move_mail(f, req):
- r = jwebmail.MoveReq()
- r.ParseFromString(req)
-
- if r.from_f:
- f = f.get_folder(r.from_f)
-
- fname = Path(f.get_filename(r.mid))
-
- assert r.to_f in f.list_folders() or r.to_f == ""
-
- sep = -2 if not r.from_f else -3
-
- if r.to_f:
- res = fname.parts[:sep] + ("." + r.to_f,) + fname.parts[-2:]
- else:
- res = fname.parts[:sep] + fname.parts[-2:]
-
- fname.rename(Path(*res))
-
- return jwebmail.MoveResp().SerializeToString()
-
-
-def remove_mail(f, req):
- r = jwebmail.RemoveReq()
- r.ParseFromString(req)
-
- if r.folder:
- f = f.get_folder(r.folder)
-
- f.set_msgtype("MaildirMessage")
+ if path.isdir(name):
+ return {"status": "skipped"}
+
+ mkdir(name)
+ mkdir(path.join(name, "cur"))
+ mkdir(path.join(name, "new"))
+ mkdir(path.join(name, "tmp"))
+
+ return {"status": "created"}
+
+ @staticmethod
+ def _adr(addrs):
+ if addrs is None:
+ return []
+
+ ret = []
+ for addr in addrs.addresses:
+ struct = {"address": addr.addr_spec}
+ if addr.display_name:
+ struct["name"] = addr.display_name
+ ret.append(struct)
+ return ret
+
+ @staticmethod
+ def _get_mime_head_info(msg):
+ mh = dict(
+ mime_type={
+ "main_type": msg.get_content_maintype(),
+ "sub_type": msg.get_content_subtype(),
+ }
+ )
+ if (cd := msg.get_content_disposition()) == "inline":
+ mh["content_dispo"] = "inline"
+ elif cd == "attachment":
+ mh["content_dispo"] = "attachment"
+ elif cd is None:
+ mh["content_dispo"] = "none"
+ else:
+ assert False
+
+ if fn := msg.get_filename():
+ mh["file_name"] = fn
+
+ return mh
+
+ def _get_head_info(self, msg):
+ mh = dict(
+ send_date=msg["date"].datetime.isoformat(),
+ written_from=self._adr(msg["from"]),
+ reply_to=self._adr(msg["reply-to"]),
+ send_to=self._adr(msg["to"]),
+ cc=self._adr(msg["cc"]),
+ bcc=self._adr(msg["bcc"]),
+ subject=msg["subject"],
+ comments=msg["comments"],
+ keywords=msg["keywords"],
+ mime=self._get_mime_head_info(msg),
+ )
- f[r.mid].add_flag("T")
+ if s := self._adr(msg["sender"]):
+ mh.sender = s[0]
- return jwebmail.RemoveResp().SerializeToString()
+ return mh
+ def _get_body(self, mail):
+ if not mail.is_multipart():
+ if mail.get_content_maintype() == "text":
+ return dict(discrete=mail.get_content())
+ else:
+ ret = mail.get_content()
+ if ret.isascii():
+ return dict(discrete=ret.decode(encoding="ASCII"))
+ elif len(ret) <= 128 * 1024:
+ return dict(discrete=b64encode(ret).decode(encoding="ASCII"))
+ else:
+ raise ValueError(
+ "non attachment part too large (>512kB)", size=len(ret)
+ )
-def add_folder(f, req):
- r = jwebmail.AddFolderReq()
- r.ParseFromString(req)
+ if (mctype := mail.get_content_maintype()) == "message":
+ msg = mail.get_content()
+ return dict(
+ mail=dict(head=self._get_head_info(msg), body=self._get_body(msg))
+ )
+ elif mctype == "multipart":
+ ret = dict(
+ preamble=mail.preamble,
+ parts=[],
+ epilogue=mail.epilogue,
+ )
+ for part in mail.iter_parts():
+ head = self._get_mime_head_info(part)
+ if head["content_dispo"] != "attachment":
+ body = self._get_body(part)
+ else:
+ body = None
+ ret["parts"].append(dict(mime_header=head, body=body))
+ return dict(multipart=ret)
+ else:
+ raise ValueError(f"unknown major content-type {mctype!r}")
+
+ def _descent(self, xx):
+ head = self._get_mime_head_info(xx)
+ if (mctype := head["mime_type"]["main_type"]) == "message":
+ body = xx.get_content()
+ elif mctype == "multipart":
+ body = xx.iter_parts()
+ else:
+ body = xx.get_content()
+ return head, body
+
+ @staticmethod
+ def _matches(m, pattern):
+ if m.is_multipart():
+ return any(
+ 1
+ for part in m.body.parts
+ if re.search(pattern, part.decoded())
+ or re.search(pattern, part.subject)
+ )
+ return re.search(pattern, m.body.decoded()) or re.search(pattern, m.subject)
- name = path.join(f._path, "." + r.name.translate(str.maketrans("/", ".")))
+ @staticmethod
+ def _sort_mails(f, sort):
- if path.isdir(name):
- return jwebmail.AddFolderResp(status=1).SerializeToString()
+ match sort["direction"]:
+ case "asc":
+ reverse = False
+ case "desc":
+ pass
+ reverse = True
+ case sort_direct:
+ raise ValueError(f"unknown sort direction {sort_direct!r}")
- mkdir(name)
- mkdir(path.join(name, "cur"))
- mkdir(path.join(name, "new"))
- mkdir(path.join(name, "tmp"))
+ def _sort_by_sender(midmsg):
+ _, msg = midmsg
- return jwebmail.AddFolderResp(status=0).SerializeToString()
+ if len(addrs := msg["from"].addresses) == 1:
+ return addrs[0].addr_spec
+ else:
+ return msg["sender"].address.addr_spec
+ def by_rec_date(midmsg):
+ return float(re.match(r"\d+\.\d+", midmsg[0], re.ASCII)[0])
-def method_to_run(value):
- match value:
- case "list":
- return list_mails
- case "count":
- return count_mails
- case "read":
- return read_mail
- case "raw":
- return raw_mail
- case "folders":
- return folders
- case "move":
- return move_mail
- case "remove":
- return remove_mail
- case "search":
- return search_mails
- case "add-folder":
- return add_folder
- case _:
- raise ValueError(value)
+ match sort["parameter"]:
+ case "date":
+ keyfn = by_rec_date
+ case "sender":
+ keyfn = _sort_by_sender
+ case "subject":
+ # fmt: off
+ def keyfn(midmsg): return midmsg[1]["subject"]
+ # fmt: on
+ case "size":
+ # fmt: off
+ def keyfn(midmsg): return path.getsize(f.get_filename(midmsg[0]))
+ # fmt: on
+ case sort_param:
+ logging.warning("unknown sort-verb %r", sort_param)
+ reverse = False
+ keyfn = by_rec_date
+ return keyfn, reverse
-def parse_arguments():
- ap = ArgumentParser(allow_abbrev=False)
- ap.add_argument("maildir_path", type=Path)
- ap.add_argument("os_user")
- ap.add_argument("mail_user")
- ap.add_argument("method", type=method_to_run)
- return vars(ap.parse_args())
+class RequestHandler(varlink.RequestHandler):
+ service = service
def main():
@@ -545,25 +557,20 @@ def main():
level="INFO",
format="%(levelname)s:" + str(getpid()) + ":%(message)s",
)
- args = parse_arguments()
- logging.debug("started with %s", args)
- s = startup(
- args["maildir_path"],
- args["os_user"],
- args["mail_user"],
- args["method"],
- )
- logging.debug("setuid successful")
- stdout.write("OPEN\n")
- stdout.flush()
- val = stdin.buffer.read()
- run = args["method"]
- reply = run(s, val)
- logging.debug("pb method(%s) size(%d)", args["method"], len(reply))
- stdout.buffer.write(reply)
- # except QMAuthError as qerr:
- # errmsg = dict(error=qerr.msg, **qerr.info)
- # sysexit(3)
+
+ if environ["LISTEN_FDS"] == "1":
+ sok = socket.fromfd(3, socket.AF_UNIX, socket.SOCK_STREAM)
+ RequestHandler(sok, None, None)
+ return
+
+ types = environ["LISTEN_FDNAMES"].split(":")
+ assert len(types) == int(environ["LISTEN_FDS"])
+
+ for i, typ in enumerate(types):
+ if typ == "varlink":
+ sok = socket.fromfd(3 + i, socket.AF_UNIX, socket.SOCK_STREAM)
+ RequestHandler(sok, None, None)
+ sok.close()
except Exception:
logging.exception("qmauth.py error")
sysexit(4)