from urllib.parse import urlparse
from flask import abort, current_app, flash, redirect, render_template, request, url_for
from flask_babel import gettext, lazy_gettext
from flask_login import UserMixin, current_user, login_user, logout_user
from flask_paginate import Pagination, get_page_parameter, get_per_page_parameter
from flask_wtf import FlaskForm
from wtforms import (
EmailField,
MultipleFileField,
PasswordField,
StringField,
SubmitField,
TextAreaField,
validators,
)
from .model.read_mails import QMAuthError
from .read_mails import add_user, get_read_mails_logged_in
from .read_mails import login as rm_login
from .render_mail import to_mime_type
class JWebmailUser(UserMixin):
def __init__(self, mail_addr, password):
self.id = mail_addr
self.password = password
class LoginForm(FlaskForm):
username = StringField(lazy_gettext("Username"), [validators.Email()])
password = PasswordField(
lazy_gettext("Password"), [validators.Length(min=5, max=35)]
)
class WriteForm(FlaskForm):
send_to = EmailField("Send to", [validators.InputRequired()])
subject = StringField("Subject", [validators.InputRequired()])
cc = StringField("CC")
bcc = StringField("BCC")
answer_to = EmailField("Answer to")
content = TextAreaField("Content")
attachments = MultipleFileField("Attachments")
submit = SubmitField("Send")
def login():
if current_user.is_authenticated:
return redirect(url_for("displayheaders"), 307)
form = LoginForm()
warn = ""
if form.validate_on_submit():
if rm_login(form.username.data, form.password.data):
user = JWebmailUser(form.username.data, form.password.data)
add_user(user)
login_user(user)
next = request.args.get("next")
if urlparse(next).netloc:
abort(401)
return redirect(next or url_for("displayheaders"), 303)
else:
warn = gettext("login failed")
return render_template("login.html", login_form=form, warn=warn), 401
def logout():
logout_user()
return redirect(url_for("login"), 303)
def about():
view_model = {
"scriptadmin": current_app.config["JWEBMAIL"]["ADMIN_MAIL"],
"http_host": request.host,
"request_uri": request.full_path,
"remote_addr": request.remote_addr,
"languages": current_app.extensions["babel"].instance.list_translations(),
}
return render_template("about.html", **view_model)
def displayheaders(folder=""):
folders = get_read_mails_logged_in().folders()
if folder and folder not in folders:
return render_template("error", error="no_folder", links=folders), 404
sort = request.args.get("sort", "!date")
search = request.args.get("search")
s = sort[1:] if sort[0] == "!" else sort
if s not in ["date", "size", "sender"]:
abort(400)
count = get_read_mails_logged_in().count(folder)
page = request.args.get(get_page_parameter(), type=int, default=1)
per_page = request.args.get(get_per_page_parameter(), type=int, default=10)
pgn = Pagination(
page=page,
per_page=per_page,
total=count["total_mails"],
record_name="mails",
css_framework="bulma",
display_msg=gettext(
"displaying {start} - {end} of {total} {record_name}"
),
inner_window=1,
outer_window=0,
)
if search:
headers = get_read_mails_logged_in().search(search, folder)
else:
headers = get_read_mails_logged_in().read_headers_for(
folder=folder,
start=(pgn.page - 1) * pgn.per_page,
end=(pgn.page - 1) * pgn.per_page + pgn.per_page,
sort=sort,
)
vals = {
"folder": folder,
"pgn": pgn,
"msgs": headers,
"mail_folders": folders,
"total_size": count["byte_size"],
"total_new_mails": count["unread_mails"],
}
return render_template("displayheaders.html", **vals)
def readmail(msgid):
try:
mail = get_read_mails_logged_in().show("", msgid)
except QMAuthError:
return render_template("not_found.html"), 404
return render_template("readmail.html", msg=mail)
def writemail():
return render_template("writemail.html", form=WriteForm())
def move(folder):
folders = get_read_mails_logged_in().folders()
mm = request.args.getlist("mail")
to_folder = request.args["folder"]
if folder not in folders or to_folder not in folders:
raise ValueError("folder not valid")
for m in mm:
get_read_mails_logged_in().move(m, folder, to_folder)
flash(gettext("succ_move"))
return redirect(url_for("displayheaders"), 303)
def rawmail(msgid):
path = request.args.get("path", "")
content = get_read_mails_logged_in().raw("", msgid, path)
headers = []
cd = content["head"].get("content_disposition")
if cd and cd.lower() == "attachment":
headers.append(
(
"Content-Disposition",
f"attachment; filename={content['head']['filename']}",
)
)
ct = to_mime_type(content["head"])
if ct.startswith("text/"):
ct += "; charset=UTF-8"
headers.append(("Content-Type", ct))
return content["body"], headers
def sendmail():
form = WriteForm()
if not form.validate():
abort(400)
mail = {
"to": form.to.data,
"message": form.content.data,
"subject": form.subject.data,
"cc": form.cc.data,
"bcc": form.bcc.data,
"reply": form.answer_to.data,
"attach": form.attachments.data,
"from": "",
}
error = send_mail(mail)
if error:
return render_template("writemail.html", warning=gettext("error_send")), 400
flash(gettext("succ_send"))
return redirect(url_for("displayheaders"), 303)
"""
sub remove {
my $self = shift;
my $v = $self->validation;
$v->csrf_protect;
$v->required('mail');
if ($v->has_error) {
$self->reply->exception('errors in ' . join('', $v->failed->@*));
return;
}
my $auth = $self->stash(STS_AUTH);
my $mm = $self->every_param('mail');
my $folder = $self->stash('folder');
$self->users->remove($auth, $folder, $_) for @$mm;
$self->res->code(303);
$self->redirect_to('displayheaders');
}
### session password handling
use constant { S_PASSWD => 'pw', S_OTP_S3D_PW => 'otp_s3d_pw' };
sub _rand_data {
my $len = shift;
if (TRUE_RANDOM) {
#return makerandom_octet(Length => $len, Strength => 0); # was used for Crypt::Random
return urandom($len);
}
else {
my $res = '';
for (0..$len-1) {
vec($res, $_, 8) = int rand 256;
}
return $res;
}
}
sub _session_passwd {
my ($self, $passwd, $challenge) = @_;
my $secAlg = LOGIN_SCHEME;
$self->_warn_crypt;
if (defined $passwd) { # set
if ($secAlg eq fc 'cram_md5') {
$self->session(S_PASSWD() => $passwd, challenge => $challenge);
}
elsif ($secAlg eq fc 'plain') {
unless ($passwd) {
$self->s3d(S_PASSWD, '');
delete $self->session->{S_OTP_S3D_PW()};
return;
}
die "'$passwd' contains invalid character \\n" if $passwd =~ /\n/;
if (length $passwd < 20) {
$passwd .= "\n" . ' ' x (20 - length($passwd) - 1);
}
my $passwd_utf8 = encode('UTF-8', $passwd);
my $rand_bytes = _rand_data(length $passwd_utf8);
$self->s3d(S_PASSWD, b64_encode($passwd_utf8 ^ $rand_bytes, ''));
$self->session(S_OTP_S3D_PW, b64_encode($rand_bytes, ''));
}
else {
die
}
}
else { # get
if ($secAlg eq fc 'cram_md5') {
wantarray or carp "you forgot the challenge";
return ($self->session(S_PASSWD), $self->session('challenge'));
}
elsif ($secAlg eq fc 'plain') {
my $pw = b64_decode($self->s3d(S_PASSWD) || '');
my $otp = b64_decode($self->session(S_OTP_S3D_PW) || '');
my ($res) = split "\n", decode('UTF-8', $pw ^ $otp), 2;
return $res;
}
else {
die
}
}
}
sub _warn_crypt {
my $self = shift;
state $once = 0;
if ( !TRUE_RANDOM && !$once && LOGIN_SCHEME eq fc 'plain' ) {
$self->log->warn("Falling back to pseudo random generation. Please install Crypt::URandom");
$once = 1;
}
}
"""