from urllib.parse import urlparse from flask import ( abort, current_app, flash, g, redirect, render_template, request, url_for, ) from flask_babel import get_locale, gettext, lazy_gettext from flask_login import current_user, login_user, logout_user from flask_paginate import Pagination, get_page_parameter, get_per_page_parameter from flask_wtf import FlaskForm from wtforms import ( EmailField, MultipleFileField, PasswordField, StringField, SubmitField, TextAreaField, validators, ) from .model.read_mails import QMAuthError from .read_mails import JWebmailUser, add_user, get_read_mails_logged_in from .read_mails import login as rm_login from .render_mail import to_mime_type class LoginForm(FlaskForm): username = StringField(lazy_gettext("Username"), [validators.Email()]) password = PasswordField( lazy_gettext("Password"), [validators.Length(min=5, max=35)] ) class WriteForm(FlaskForm): send_to = EmailField("Send to", [validators.InputRequired()]) subject = StringField("Subject", [validators.InputRequired()]) cc = StringField("CC") bcc = StringField("BCC") answer_to = EmailField("Answer to") content = TextAreaField("Content") attachments = MultipleFileField("Attachments") submit = SubmitField("Send") def login(): if current_user.is_authenticated: return redirect(url_for("displayheaders"), 307) form = LoginForm() warn = "" rc = 200 if form.validate_on_submit(): if rm_login(form.username.data, form.password.data): user = JWebmailUser(form.username.data, form.password.data) add_user(user) login_user(user) nxt = request.args.get("next") if urlparse(nxt).netloc: abort(401) return redirect( nxt or url_for("displayheaders", lang_code=get_locale().language), 303 ) else: warn = gettext("login failed!") elif request.method == "POST": rc = 401 return render_template("login.html", login_form=form, warn=warn), rc def logout(): logout_user() return redirect(url_for("login"), 303) def about(): view_model = { "scriptadmin": current_app.config["JWEBMAIL"]["ADMIN_MAIL"], "http_host": request.host, "request_uri": request.full_path, "remote_addr": request.remote_addr, "languages": current_app.extensions["babel"].instance.list_translations(), } return render_template("about.html", **view_model) def displayheaders(folder=""): folders = get_read_mails_logged_in().folders() if folder and folder not in folders: return render_template("error", error="no_folder", links=folders), 404 sort = request.args.get("sort", "!date") search = request.args.get("search") s = sort[1:] if sort[0] == "!" else sort if s not in ["date", "size", "sender"]: abort(400) count = get_read_mails_logged_in().count(folder) page = request.args.get(get_page_parameter(), type=int, default=1) per_page = request.args.get(get_per_page_parameter(), type=int, default=10) pgn = Pagination( page=page, per_page=per_page, total=count["total_mails"], record_name="mails", css_framework="bulma", display_msg=gettext( "displaying {start} - {end} of {total} {record_name}" ), inner_window=1, outer_window=0, ) if search: headers = get_read_mails_logged_in().search(search, folder) else: headers = get_read_mails_logged_in().read_headers_for( folder=folder, start=(pgn.page - 1) * pgn.per_page, end=(pgn.page - 1) * pgn.per_page + pgn.per_page, sort=sort, ) vals = { "folder": folder, "pgn": pgn, "msgs": headers, "mail_folders": folders, "total_size": count["byte_size"], "total_new_mails": count["unread_mails"], } return render_template("displayheaders.html", **vals) def readmail(msgid, folder=""): try: mail = get_read_mails_logged_in().show(folder, msgid) except QMAuthError: return render_template("not_found.html"), 404 return render_template("readmail.html", msg=mail, folder=folder) def writemail(): return render_template("writemail.html", form=WriteForm()) def move(folder): folders = get_read_mails_logged_in().folders() mm = request.args.getlist("mail") to_folder = request.args["folder"] if folder not in folders or to_folder not in folders: raise ValueError("folder not valid") for m in mm: get_read_mails_logged_in().move(m, folder, to_folder) flash(gettext("succ_move")) return redirect(url_for("displayheaders"), 303) def rawmail(msgid, folder=""): path = request.args.get("path", "") content = get_read_mails_logged_in().raw(folder, msgid, path) headers = [] cd = content["head"].get("content_disposition") if cd and cd.lower() == "attachment": headers.append( ( "Content-Disposition", f"attachment; filename={content['head']['filename']}", ) ) ct = to_mime_type(content["head"]) if ct.startswith("text/"): ct += "; charset=UTF-8" headers.append(("Content-Type", ct)) return content["body"], headers def sendmail(): form = WriteForm() if not form.validate(): abort(400) mail = { "to": form.to.data, "message": form.content.data, "subject": form.subject.data, "cc": form.cc.data, "bcc": form.bcc.data, "reply": form.answer_to.data, "attach": form.attachments.data, "from": "", } error = send_mail(mail) if error: return render_template("writemail.html", warning=gettext("error_send")), 400 flash(gettext("succ_send")) return redirect(url_for("displayheaders"), 303)