import email.utils import mimetypes from email.message import EmailMessage from io import BytesIO from smtplib import ( SMTP_SSL, SMTPAuthenticationError, SMTPException, SMTPHeloError, SMTPNotSupportedError, ) from urllib.parse import urlparse from flask import abort, current_app, flash, redirect, render_template, request, url_for from flask_babel import gettext, lazy_gettext from flask_login import current_user, logout_user from flask_wtf import FlaskForm from werkzeug.utils import secure_filename from wtforms import ( EmailField, MultipleFileField, PasswordField, StringField, SubmitField, TextAreaField, validators, ) from .model.read_mails import QMAuthError from .read_mails import get_read_mails_logged_in from .read_mails import login as rm_login from .render_mail import to_mime_type class LoginForm(FlaskForm): username = StringField(lazy_gettext("Username"), [validators.Email()]) password = PasswordField( lazy_gettext("Password"), [validators.Length(min=5, max=120)] ) class WriteForm(FlaskForm): send_to = EmailField("Send to", [validators.InputRequired()]) subject = StringField("Subject", [validators.InputRequired()]) cc = StringField("CC") bcc = StringField("BCC") answer_to = EmailField("Answer to") content = TextAreaField("Content") attachments = MultipleFileField("Attachments") submit = SubmitField("Send") def login(): if current_user.is_authenticated: return redirect(url_for("displayheaders"), 307) form = LoginForm() warn = "" rc = 200 if form.validate_on_submit(): if rm_login(form.username.data, form.password.data): nxt = request.args.get("next") if urlparse(nxt).netloc: abort(401) return redirect(nxt or url_for("displayheaders"), 303) else: warn = gettext("login failed!") elif request.method == "POST": rc = 401 return render_template("login.html", login_form=form, warn=warn), rc def logout(): logout_user() return redirect(url_for("login"), 303) def about(): return render_template( "about.html", languages=current_app.extensions["babel"].instance.list_translations(), ) def displayheaders(folder=""): with get_read_mails_logged_in() as read_mails: folders = read_mails.folders() if folder and folder not in folders: return render_template("error", error="no_folder", links=folders), 404 page_bound = request.args.get("page_bound") page_after = bool(request.args.get("page_after", type=int, default=True)) per_page = request.args.get("per_page", type=int, default=25) sort = request.args.get("sort", "!date") search = request.args.get("search") s = sort[1:] if sort[0] == "!" else sort if s not in ["date", "size", "sender"]: abort(400) count = read_mails.count(folder) headers, first, last = read_mails.list_search( folder=folder, bound=page_bound, after=page_after, limit=per_page, sort=sort, search=search, ) if headers: match s: case "date": page_bound_after = ( headers[-1]["date_received"] + "_" + headers[-1]["message_handle"] ) page_bound_before = ( headers[0]["date_received"] + "_" + headers[0]["message_handle"] ) case "size": page_bound_after = ( str(headers[-1]["byte_size"]) + "_" + headers[-1]["message_handle"] ) page_bound_before = ( str(headers[0]["byte_size"]) + "_" + headers[0]["message_handle"] ) case "sender": page_bound_after = ( headers[-1]["head"]["from"][0]["address"] + "_" + headers[-1]["message_handle"] ) page_bound_before = ( headers[0]["head"]["from"][0]["address"] + "_" + headers[0]["message_handle"] ) else: page_bound_after = None page_bound_before = None return render_template( "displayheaders.html", first=first, last=last, page_bound_after=page_bound_after, page_bound_before=page_bound_before, msgs=headers, mail_folders=folders, total_mails=count["total_mails"], total_size=count["byte_size"], total_new_mails=count["unread_mails"], ) def readmail(msgid, folder=""): format = request.args.get("format", "html").lower() with get_read_mails_logged_in() as read_mails: if format == "html": try: mail = read_mails.show(folder, msgid) except QMAuthError: return render_template("not_found.html"), 404 return render_template("readmail.html", msg=mail, folder=folder) elif format == "raw": path = request.args.get("path", "") content = read_mails.raw(folder, msgid, path) headers = [] cd = content["head"].get("content_disposition") if cd and cd.lower() == "attachment": headers.append( ( "Content-Disposition", f"attachment; filename={content['head']['filename']}", ) ) ct = to_mime_type(content["head"]) if ct.startswith("text/"): ct += "; charset=UTF-8" headers.append(("Content-Type", ct)) return content["body"], headers elif format == "json": return read_mails.show(folder, msgid) else: abort(404) def writemail(): return render_template("writemail.html", form=WriteForm()) def _take_common_req_args(mapping): take = {"sort", "search", "per_page", "page_bound", "after_page"} return {k: v for k, v in mapping.items() if k in take} def move(folder=""): with get_read_mails_logged_in() as read_mails: folders = read_mails.folders() mm = request.form.getlist("mail") to_folder = request.form["select-folder"] if folder not in folders or to_folder not in folders: raise ValueError("folder not valid") for m in mm: read_mails.move(m, folder, to_folder) flash(gettext("succ_move")) args = _take_common_req_args(request.form) return redirect(url_for("displayheaders", folder=folder, **args), 303) def remove(folder=""): with get_read_mails_logged_in() as read_mails: folders = read_mails.add_folder("Trash") mm = request.form.getlist("mail") folders = read_mails.folders() if folder not in folders: raise ValueError("folder not valid") if folder == "Trash": for m in mm: read_mails.remove(folder, m) else: for m in mm: read_mails.move(m, folder, "Trash") flash(gettext("succ_remove")) args = _take_common_req_args(request.form) return redirect(url_for("displayheaders", folder=folder, **args), 303) def sendmail(): form = WriteForm() if not form.validate(): abort(400) mail = EmailMessage() mail["From"] = current_user.id mail["To"] = form.send_to.data mail["Subject"] = form.subject.data mail["Date"] = email.utils.localtime() mail["Message-ID"] = email.utils.make_msgid() if form.cc.data: mail["Cc"] = form.cc.data if form.bcc.data: mail["Bcc"] = form.bcc.data if form.answer_to.data: mail["Reply"] = form.answer_to.data mail.set_content(form.content.data) for f in form.attachments.data: if not f.filename: continue bio = BytesIO() f.save(bio) fname = secure_filename(f.filename) mt, _ce = mimetypes.guess_type(fname) if mt is None: abort(400, gettext("invalid mimetype")) maintype, subtype = mt.split("/") mail.add_attachment( bio.getvalue(), maintype, subtype, cte="base64", filename=fname ) if current_app.config["JWEBMAIL"]["WRITE_MAILS"].get("DEBUG"): print(mail.as_string()) flash(gettext("succ_send")) return redirect(url_for("displayheaders"), 303) _, send_server = current_user.id.split("@") send_server = "mail." + send_server try: with SMTP_SSL(send_server) as smtp: smtp.login(current_user.id, current_user.password) smtp.send_message(mail) except ( SMTPHeloError, SMTPAuthenticationError, SMTPNotSupportedError, SMTPException, ): return ( render_template("writemail.html", form=form, warning=gettext("error_send")), 400, ) flash(gettext("succ_send")) return redirect(url_for("displayheaders"), 303)