Source code for registry.routes

"""Provides Flask integration for the external user interface."""

from typing import Any, Callable
from datetime import datetime, timedelta
from functools import wraps
from pytz import timezone

from werkzeug.urls import Href, url_encode, url_parse, url_unparse, url_encode
from flask import Blueprint, render_template, url_for, abort, request, \
    make_response, redirect, current_app, send_file, Response, redirect

from arxiv import status
from arxiv.users.auth.decorators import scoped
from arxiv.users.auth import scopes
from arxiv.users import domain
from arxiv.base import logging

from werkzeug.exceptions import BadRequest

from . import oauth2

EASTERN = timezone('US/Eastern')

logger = logging.getLogger(__name__)

blueprint = Blueprint('oauth', __name__, url_prefix='')


[docs]def redirect_to_login(*args, **kwargs): """Send the user to log in, with a pointer back to the current URL.""" query = url_encode({'next_page': request.url}) parts = url_parse(url_for('login')).replace(query=query) return redirect(url_unparse(parts))
[docs]@blueprint.route('/token', methods=['POST']) def issue_token() -> Response: """Client authentication endpoint.""" logger.debug('Request to issue token with params %s', request.form) server = current_app.server logger.debug('Got OAuth2 server %s', id(server)) response = server.create_token_response() logger.debug('Generated response %s', response) return response
[docs]@blueprint.route('/authorize', methods=['GET', 'POST']) @scoped(unauthorized=redirect_to_login) def authorize(): """User-facing endpoint for authorization code (three-legged) workflow.""" server = current_app.server if request.method == 'GET': try: grant_user = oauth2.OAuth2User(request.session.user) grant = server.validate_consent_request(end_user=grant_user) return render_template( 'registry/authorize.html', grant=grant, user=request.session.user ) except oauth2.OAuth2Error as e: logger.debug('Got OAuth2Error: %s', e) raise BadRequest(str(e)) from e elif request.method == 'POST': if request.form['confirm'] == 'ok': logger.debug('User authorizes client') grant_user = oauth2.OAuth2User(request.session.user) else: logger.debug('User has not authorized client') grant_user = None return server.create_authorization_response(grant_user=grant_user)