from urllib.parse import urlparse from flask import abort, current_app, flash, redirect, render_template, request, url_for from flask_babel import gettext, lazy_gettext from flask_login import UserMixin, current_user, login_user, logout_user from flask_paginate import Pagination, get_page_parameter, get_per_page_parameter from flask_wtf import FlaskForm from wtforms import ( EmailField, MultipleFileField, PasswordField, StringField, SubmitField, TextAreaField, validators, ) from .model.read_mails import QMAuthError from .read_mails import add_user, get_read_mails_logged_in from .read_mails import login as rm_login from .render_mail import to_mime_type class JWebmailUser(UserMixin): def __init__(self, mail_addr, password): self.id = mail_addr self.password = password class LoginForm(FlaskForm): username = StringField(lazy_gettext("Username"), [validators.Email()]) password = PasswordField( lazy_gettext("Password"), [validators.Length(min=5, max=35)] ) class WriteForm(FlaskForm): send_to = EmailField("Send to", [validators.InputRequired()]) subject = StringField("Subject", [validators.InputRequired()]) cc = StringField("CC") bcc = StringField("BCC") answer_to = EmailField("Answer to") content = TextAreaField("Content") attachments = MultipleFileField("Attachments") submit = SubmitField("Send") def login(): if current_user.is_authenticated: return redirect(url_for("displayheaders"), 307) form = LoginForm() warn = "" if form.validate_on_submit(): if rm_login(form.username.data, form.password.data): user = JWebmailUser(form.username.data, form.password.data) add_user(user) login_user(user) next = request.args.get("next") if urlparse(next).netloc: abort(401) return redirect(next or url_for("displayheaders"), 303) else: warn = gettext("login failed") return render_template("login.html", login_form=form, warn=warn), 401 def logout(): logout_user() return redirect(url_for("login"), 303) def about(): view_model = { "scriptadmin": current_app.config["JWEBMAIL"]["ADMIN_MAIL"], "http_host": request.host, "request_uri": request.full_path, "remote_addr": request.remote_addr, "languages": current_app.extensions["babel"].instance.list_translations(), } return render_template("about.html", **view_model) def displayheaders(folder=""): folders = get_read_mails_logged_in().folders() if folder and folder not in folders: return render_template("error", error="no_folder", links=folders), 404 sort = request.args.get("sort", "!date") search = request.args.get("search") s = sort[1:] if sort[0] == "!" else sort if s not in ["date", "size", "sender"]: abort(400) count = get_read_mails_logged_in().count(folder) page = request.args.get(get_page_parameter(), type=int, default=1) per_page = request.args.get(get_per_page_parameter(), type=int, default=10) pgn = Pagination( page=page, per_page=per_page, total=count["total_mails"], record_name="mails", css_framework="bulma", display_msg=gettext( "displaying {start} - {end} of {total} {record_name}" ), inner_window=1, outer_window=0, ) if search: headers = get_read_mails_logged_in().search(search, folder) else: headers = get_read_mails_logged_in().read_headers_for( folder=folder, start=(pgn.page - 1) * pgn.per_page, end=(pgn.page - 1) * pgn.per_page + pgn.per_page, sort=sort, ) vals = { "folder": folder, "pgn": pgn, "msgs": headers, "mail_folders": folders, "total_size": count["byte_size"], "total_new_mails": count["unread_mails"], } return render_template("displayheaders.html", **vals) def readmail(msgid): try: mail = get_read_mails_logged_in().show("", msgid) except QMAuthError: return render_template("not_found.html"), 404 return render_template("readmail.html", msg=mail) def writemail(): return render_template("writemail.html", form=WriteForm()) def move(folder): folders = get_read_mails_logged_in().folders() mm = request.args.getlist("mail") to_folder = request.args["folder"] if folder not in folders or to_folder not in folders: raise ValueError("folder not valid") for m in mm: get_read_mails_logged_in().move(m, folder, to_folder) flash(gettext("succ_move")) return redirect(url_for("displayheaders"), 303) def rawmail(msgid): path = request.args.get("path", "") content = get_read_mails_logged_in().raw("", msgid, path) headers = [] cd = content["head"].get("content_disposition") if cd and cd.lower() == "attachment": headers.append( ( "Content-Disposition", f"attachment; filename={content['head']['filename']}", ) ) ct = to_mime_type(content["head"]) if ct.startswith("text/"): ct += "; charset=UTF-8" headers.append(("Content-Type", ct)) return content["body"], headers def sendmail(): form = WriteForm() if not form.validate(): abort(400) mail = { "to": form.to.data, "message": form.content.data, "subject": form.subject.data, "cc": form.cc.data, "bcc": form.bcc.data, "reply": form.answer_to.data, "attach": form.attachments.data, "from": "", } error = send_mail(mail) if error: return render_template("writemail.html", warning=gettext("error_send")), 400 flash(gettext("succ_send")) return redirect(url_for("displayheaders"), 303) """ sub remove { my $self = shift; my $v = $self->validation; $v->csrf_protect; $v->required('mail'); if ($v->has_error) { $self->reply->exception('errors in ' . join('', $v->failed->@*)); return; } my $auth = $self->stash(STS_AUTH); my $mm = $self->every_param('mail'); my $folder = $self->stash('folder'); $self->users->remove($auth, $folder, $_) for @$mm; $self->res->code(303); $self->redirect_to('displayheaders'); } ### session password handling use constant { S_PASSWD => 'pw', S_OTP_S3D_PW => 'otp_s3d_pw' }; sub _rand_data { my $len = shift; if (TRUE_RANDOM) { #return makerandom_octet(Length => $len, Strength => 0); # was used for Crypt::Random return urandom($len); } else { my $res = ''; for (0..$len-1) { vec($res, $_, 8) = int rand 256; } return $res; } } sub _session_passwd { my ($self, $passwd, $challenge) = @_; my $secAlg = LOGIN_SCHEME; $self->_warn_crypt; if (defined $passwd) { # set if ($secAlg eq fc 'cram_md5') { $self->session(S_PASSWD() => $passwd, challenge => $challenge); } elsif ($secAlg eq fc 'plain') { unless ($passwd) { $self->s3d(S_PASSWD, ''); delete $self->session->{S_OTP_S3D_PW()}; return; } die "'$passwd' contains invalid character \\n" if $passwd =~ /\n/; if (length $passwd < 20) { $passwd .= "\n" . ' ' x (20 - length($passwd) - 1); } my $passwd_utf8 = encode('UTF-8', $passwd); my $rand_bytes = _rand_data(length $passwd_utf8); $self->s3d(S_PASSWD, b64_encode($passwd_utf8 ^ $rand_bytes, '')); $self->session(S_OTP_S3D_PW, b64_encode($rand_bytes, '')); } else { die } } else { # get if ($secAlg eq fc 'cram_md5') { wantarray or carp "you forgot the challenge"; return ($self->session(S_PASSWD), $self->session('challenge')); } elsif ($secAlg eq fc 'plain') { my $pw = b64_decode($self->s3d(S_PASSWD) || ''); my $otp = b64_decode($self->session(S_OTP_S3D_PW) || ''); my ($res) = split "\n", decode('UTF-8', $pw ^ $otp), 2; return $res; } else { die } } } sub _warn_crypt { my $self = shift; state $once = 0; if ( !TRUE_RANDOM && !$once && LOGIN_SCHEME eq fc 'plain' ) { $self->log->warn("Falling back to pseudo random generation. Please install Crypt::URandom"); $once = 1; } } """