import email.utils
import mimetypes
from email.message import EmailMessage
from io import BytesIO
from smtplib import (
SMTP_SSL,
SMTPAuthenticationError,
SMTPException,
SMTPHeloError,
SMTPNotSupportedError,
)
from urllib.parse import urlparse
from flask import abort, current_app, flash, redirect, render_template, request, url_for
from flask_babel import gettext, lazy_gettext
from flask_login import current_user, logout_user
from flask_paginate import Pagination, get_page_parameter, get_per_page_parameter
from flask_wtf import FlaskForm
from werkzeug.utils import secure_filename
from wtforms import (
EmailField,
MultipleFileField,
PasswordField,
StringField,
SubmitField,
TextAreaField,
validators,
)
from .model.read_mails import QMAuthError
from .read_mails import get_read_mails_logged_in
from .read_mails import login as rm_login
from .render_mail import to_mime_type
class LoginForm(FlaskForm):
username = StringField(lazy_gettext("Username"), [validators.Email()])
password = PasswordField(
lazy_gettext("Password"), [validators.Length(min=5, max=120)]
)
class WriteForm(FlaskForm):
send_to = EmailField("Send to", [validators.InputRequired()])
subject = StringField("Subject", [validators.InputRequired()])
cc = StringField("CC")
bcc = StringField("BCC")
answer_to = EmailField("Answer to")
content = TextAreaField("Content")
attachments = MultipleFileField("Attachments")
submit = SubmitField("Send")
def login():
if current_user.is_authenticated:
return redirect(url_for("displayheaders"), 307)
form = LoginForm()
warn = ""
rc = 200
if form.validate_on_submit():
if rm_login(form.username.data, form.password.data):
nxt = request.args.get("next")
if urlparse(nxt).netloc:
abort(401)
return redirect(nxt or url_for("displayheaders"), 303)
else:
warn = gettext("login failed!")
elif request.method == "POST":
rc = 401
return render_template("login.html", login_form=form, warn=warn), rc
def logout():
logout_user()
return redirect(url_for("login"), 303)
def about():
view_model = {
"scriptadmin": current_app.config["JWEBMAIL"]["ADMIN_MAIL"],
"http_host": request.host,
"request_uri": request.full_path,
"remote_addr": request.remote_addr,
"languages": current_app.extensions["babel"].instance.list_translations(),
}
return render_template("about.html", **view_model)
def displayheaders(folder=""):
folders = get_read_mails_logged_in().folders()
if folder and folder not in folders:
return render_template("error", error="no_folder", links=folders), 404
sort = request.args.get("sort", "!date")
search = request.args.get("search")
s = sort[1:] if sort[0] == "!" else sort
if s not in ["date", "size", "sender"]:
abort(400)
count = get_read_mails_logged_in().count(folder)
page = request.args.get(get_page_parameter(), type=int, default=1)
per_page = request.args.get(get_per_page_parameter(), type=int, default=25)
pgn = Pagination(
page=page,
per_page=per_page,
total=count["total_mails"],
record_name="mails",
css_framework="bulma",
display_msg=gettext(
"displaying {start} - {end} of {total} {record_name}"
),
inner_window=1,
outer_window=0,
)
if search:
headers = get_read_mails_logged_in().search(search, folder)
else:
headers = get_read_mails_logged_in().read_headers_for(
folder=folder,
start=(pgn.page - 1) * pgn.per_page,
end=(pgn.page - 1) * pgn.per_page + pgn.per_page,
sort=sort,
)
vals = {
"folder": folder,
"pgn": pgn,
"msgs": headers,
"mail_folders": folders,
"total_size": count["byte_size"],
"total_new_mails": count["unread_mails"],
}
return render_template("displayheaders.html", **vals)
def readmail(msgid, folder=""):
format = request.args.get("format", "html").lower()
if format == "html":
try:
mail = get_read_mails_logged_in().show(folder, msgid)
except QMAuthError:
return render_template("not_found.html"), 404
return render_template("readmail.html", msg=mail, folder=folder)
elif format == "raw":
path = request.args.get("path", "")
content = get_read_mails_logged_in().raw(folder, msgid, path)
headers = []
cd = content["head"].get("content_disposition")
if cd and cd.lower() == "attachment":
headers.append(
(
"Content-Disposition",
f"attachment; filename={content['head']['filename']}",
)
)
ct = to_mime_type(content["head"])
if ct.startswith("text/"):
ct += "; charset=UTF-8"
headers.append(("Content-Type", ct))
return content["body"], headers
elif format == "json":
mail = get_read_mails_logged_in().show(folder, msgid)
return mail
else:
abort(404)
def writemail():
return render_template("writemail.html", form=WriteForm())
def _take_common_req_args(mapping):
print(mapping)
res = dict()
if mapping.get("sort"):
res["sort"] = mapping["sort"]
if mapping.get("per_page"):
res["per_page"] = mapping["per_page"]
if mapping.get("page"):
res["page"] = mapping["page"]
return res
def move(folder=""):
folders = get_read_mails_logged_in().folders()
mm = request.form.getlist("mail")
to_folder = request.form["select-folder"]
if folder not in folders or to_folder not in folders:
raise ValueError("folder not valid")
for m in mm:
get_read_mails_logged_in().move(m, folder, to_folder)
flash(gettext("succ_move"))
args = _take_common_req_args(request.form)
return redirect(url_for("displayheaders", folder=folder, **args), 303)
def remove(folder=""):
folders = get_read_mails_logged_in().add_folder("Trash")
mm = request.form.getlist("mail")
folders = get_read_mails_logged_in().folders()
if folder not in folders:
raise ValueError("folder not valid")
if folder == "Trash":
for m in mm:
get_read_mails_logged_in().remove(m, folder)
else:
for m in mm:
get_read_mails_logged_in().move(m, folder, "Trash")
flash(gettext("succ_remove"))
args = _take_common_req_args(request.form)
return redirect(url_for("displayheaders", folder=folder, **args), 303)
def sendmail():
form = WriteForm()
if not form.validate():
abort(400)
mail = EmailMessage()
mail["From"] = current_user.id
mail["To"] = form.send_to.data
mail["Subject"] = form.subject.data
mail["Date"] = email.utils.localtime()
mail["Message-ID"] = email.utils.make_msgid()
if form.cc.data:
mail["Cc"] = form.cc.data
if form.bcc.data:
mail["Bcc"] = form.bcc.data
if form.answer_to.data:
mail["Reply"] = form.answer_to.data
mail.set_content(form.content.data)
for f in form.attachments.data:
if not f.filename:
continue
bio = BytesIO()
f.save(bio)
fname = secure_filename(f.filename)
mt, _ce = mimetypes.guess_type(fname)
if mt is None:
abort(400, gettext("invalid mimetype"))
maintype, subtype = mt.split("/")
mail.add_attachment(
bio.getvalue(), maintype, subtype, cte="base64", filename=fname
)
if current_app.config["JWEBMAIL"]["WRITE_MAILS"].get("DEBUG"):
print(mail.as_string())
flash(gettext("succ_send"))
return redirect(url_for("displayheaders"), 303)
_, send_server = current_user.id.split("@")
send_server = "mail." + send_server
try:
with SMTP_SSL(send_server) as smtp:
smtp.login(current_user.id, current_user.password)
smtp.send_message(mail)
except (
SMTPHeloError,
SMTPAuthenticationError,
SMTPNotSupportedError,
SMTPException,
):
return (
render_template("writemail.html", form=form, warning=gettext("error_send")),
400,
)
flash(gettext("succ_send"))
return redirect(url_for("displayheaders"), 303)