import email.utils import mimetypes from email.message import EmailMessage from io import BytesIO from smtplib import ( SMTP_SSL, SMTPAuthenticationError, SMTPException, SMTPHeloError, SMTPNotSupportedError, ) from urllib.parse import urlparse from flask import abort, current_app, flash, redirect, render_template, request, url_for from flask_babel import gettext, lazy_gettext from flask_login import current_user, logout_user from flask_paginate import Pagination, get_page_parameter, get_per_page_parameter from flask_wtf import FlaskForm from werkzeug.utils import secure_filename from wtforms import ( EmailField, MultipleFileField, PasswordField, StringField, SubmitField, TextAreaField, validators, ) from .model.read_mails import QMAuthError from .read_mails import get_read_mails_logged_in from .read_mails import login as rm_login from .render_mail import to_mime_type class LoginForm(FlaskForm): username = StringField(lazy_gettext("Username"), [validators.Email()]) password = PasswordField( lazy_gettext("Password"), [validators.Length(min=5, max=120)] ) class WriteForm(FlaskForm): send_to = EmailField("Send to", [validators.InputRequired()]) subject = StringField("Subject", [validators.InputRequired()]) cc = StringField("CC") bcc = StringField("BCC") answer_to = EmailField("Answer to") content = TextAreaField("Content") attachments = MultipleFileField("Attachments") submit = SubmitField("Send") def login(): if current_user.is_authenticated: return redirect(url_for("displayheaders"), 307) form = LoginForm() warn = "" rc = 200 if form.validate_on_submit(): if rm_login(form.username.data, form.password.data): nxt = request.args.get("next") if urlparse(nxt).netloc: abort(401) return redirect(nxt or url_for("displayheaders"), 303) else: warn = gettext("login failed!") elif request.method == "POST": rc = 401 return render_template("login.html", login_form=form, warn=warn), rc def logout(): logout_user() return redirect(url_for("login"), 303) def about(): view_model = { "scriptadmin": current_app.config["JWEBMAIL"]["ADMIN_MAIL"], "http_host": request.host, "request_uri": request.full_path, "remote_addr": request.remote_addr, "languages": current_app.extensions["babel"].instance.list_translations(), } return render_template("about.html", **view_model) def displayheaders(folder=""): folders = get_read_mails_logged_in().folders() if folder and folder not in folders: return render_template("error", error="no_folder", links=folders), 404 sort = request.args.get("sort", "!date") search = request.args.get("search") s = sort[1:] if sort[0] == "!" else sort if s not in ["date", "size", "sender"]: abort(400) count = get_read_mails_logged_in().count(folder) page = request.args.get(get_page_parameter(), type=int, default=1) per_page = request.args.get(get_per_page_parameter(), type=int, default=25) pgn = Pagination( page=page, per_page=per_page, total=count["total_mails"], record_name="mails", css_framework="bulma", display_msg=gettext( "displaying {start} - {end} of {total} {record_name}" ), inner_window=1, outer_window=0, ) if search: headers = get_read_mails_logged_in().search(search, folder) else: headers = get_read_mails_logged_in().read_headers_for( folder=folder, start=(pgn.page - 1) * pgn.per_page, end=(pgn.page - 1) * pgn.per_page + pgn.per_page, sort=sort, ) vals = { "folder": folder, "pgn": pgn, "msgs": headers, "mail_folders": folders, "total_size": count["byte_size"], "total_new_mails": count["unread_mails"], } return render_template("displayheaders.html", **vals) def readmail(msgid, folder=""): format = request.args.get("format", "html").lower() if format == "html": try: mail = get_read_mails_logged_in().show(folder, msgid) except QMAuthError: return render_template("not_found.html"), 404 return render_template("readmail.html", msg=mail, folder=folder) elif format == "raw": path = request.args.get("path", "") content = get_read_mails_logged_in().raw(folder, msgid, path) headers = [] cd = content["head"].get("content_disposition") if cd and cd.lower() == "attachment": headers.append( ( "Content-Disposition", f"attachment; filename={content['head']['filename']}", ) ) ct = to_mime_type(content["head"]) if ct.startswith("text/"): ct += "; charset=UTF-8" headers.append(("Content-Type", ct)) return content["body"], headers elif format == "json": mail = get_read_mails_logged_in().show(folder, msgid) return mail else: abort(404) def writemail(): return render_template("writemail.html", form=WriteForm()) def _take_common_req_args(mapping): print(mapping) res = dict() if mapping.get("sort"): res["sort"] = mapping["sort"] if mapping.get("per_page"): res["per_page"] = mapping["per_page"] if mapping.get("page"): res["page"] = mapping["page"] return res def move(folder=""): folders = get_read_mails_logged_in().folders() mm = request.form.getlist("mail") to_folder = request.form["select-folder"] if folder not in folders or to_folder not in folders: raise ValueError("folder not valid") for m in mm: get_read_mails_logged_in().move(m, folder, to_folder) flash(gettext("succ_move")) args = _take_common_req_args(request.form) return redirect(url_for("displayheaders", folder=folder, **args), 303) def remove(folder=""): folders = get_read_mails_logged_in().add_folder("Trash") mm = request.form.getlist("mail") folders = get_read_mails_logged_in().folders() if folder not in folders: raise ValueError("folder not valid") if folder == "Trash": for m in mm: get_read_mails_logged_in().remove(m, folder) else: for m in mm: get_read_mails_logged_in().move(m, folder, "Trash") flash(gettext("succ_remove")) args = _take_common_req_args(request.form) return redirect(url_for("displayheaders", folder=folder, **args), 303) def sendmail(): form = WriteForm() if not form.validate(): abort(400) mail = EmailMessage() mail["From"] = current_user.id mail["To"] = form.send_to.data mail["Subject"] = form.subject.data mail["Date"] = email.utils.localtime() mail["Message-ID"] = email.utils.make_msgid() if form.cc.data: mail["Cc"] = form.cc.data if form.bcc.data: mail["Bcc"] = form.bcc.data if form.answer_to.data: mail["Reply"] = form.answer_to.data mail.set_content(form.content.data) for f in form.attachments.data: if not f.filename: continue bio = BytesIO() f.save(bio) fname = secure_filename(f.filename) mt, _ce = mimetypes.guess_type(fname) if mt is None: abort(400, gettext("invalid mimetype")) maintype, subtype = mt.split("/") mail.add_attachment( bio.getvalue(), maintype, subtype, cte="base64", filename=fname ) if current_app.config["JWEBMAIL"]["WRITE_MAILS"].get("DEBUG"): print(mail.as_string()) flash(gettext("succ_send")) return redirect(url_for("displayheaders"), 303) _, send_server = current_user.id.split("@") send_server = "mail." + send_server try: with SMTP_SSL(send_server) as smtp: smtp.login(current_user.id, current_user.password) smtp.send_message(mail) except ( SMTPHeloError, SMTPAuthenticationError, SMTPNotSupportedError, SMTPException, ): return ( render_template("writemail.html", form=form, warning=gettext("error_send")), 400, ) flash(gettext("succ_send")) return redirect(url_for("displayheaders"), 303)