Source code for compiler.routes

"""
Provides the main API blueprint for compilation.

Notes
-----
mypy doesn't have types for flask.Headers.extend, so those lines are
excluded from type checking.

"""

from typing import Callable, Union, Iterable, Tuple, Optional
from functools import wraps
from http import HTTPStatus as status
from werkzeug.exceptions import Unauthorized, Forbidden, BadRequest
from werkzeug.wrappers import Response as WkzResponse
from flask.json import jsonify
from flask import Blueprint, current_app, redirect, request, g, send_file
from flask import Response as FlaskResponse

from arxiv.users.auth.decorators import scoped
from arxiv.users.auth import scopes
from arxiv.base import logging
from arxiv.users.domain import Session, Scope
from arxiv.users.auth import scopes
from arxiv.users.auth.decorators import scoped

from . import controllers
from .domain import Task

Response = Union[FlaskResponse, WkzResponse]

logger = logging.getLogger(__name__)

blueprint = Blueprint('api', __name__, url_prefix='')

base_url = '/<string:source_id>/<string:checksum>/<string:output_format>'


[docs]def authorizer(scope: Scope) -> Callable[[Task], bool]: """Make an authorizer function for injection into a controller.""" def inner(task: Task) -> bool: """Check whether the session is authorized for a specific resource.""" if not task.owner: # If there is no owner, this is a public resource. return True return (request.auth.is_authorized(scope, task.task_id) or (request.auth.user and str(request.auth.user.user_id) == str(task.owner))) return inner
[docs]def resource_id(source_id: str, checksum: str, output_format: str) -> str: """Get the resource ID for an endpoint.""" return f"{source_id}/{checksum}/{output_format}"
[docs]@blueprint.route('/status', methods=['GET']) def get_service_status() -> Union[str, Response]: """Get information about the current status of compilation service.""" data, code, headers = controllers.service_status() response: Response = jsonify(data) response.status_code = code response.headers.extend(headers.items()) # type: ignore return response
[docs]@blueprint.route('/', methods=['POST']) @scoped(scopes.CREATE_COMPILE) def compile() -> Response: """Request that a source package be compiled.""" request_data = request.get_json(force=True) token = request.environ['token'] logger.debug('Request for compilation: %s', request_data) logger.debug('Got token: %s', token) data, code, headers = controllers.compile( request_data, token, request.auth, authorizer(scopes.CREATE_COMPILE) ) response: Response = jsonify(data) response.status_code = code response.headers.extend(headers.items()) # type: ignore return response
[docs]@blueprint.route(base_url, methods=['GET']) @scoped(scopes.READ_COMPILE, resource=resource_id) def get_status(source_id: str, checksum: str, output_format: str) -> Response: """Get the status of a compilation task.""" data, code, headers = controllers.get_status( source_id, checksum, output_format, authorizer(scopes.READ_COMPILE) ) if code in [status.SEE_OTHER, status.FOUND]: return redirect(headers['Location'], code=code) response: Response = jsonify(data) response.status_code = code response.headers.extend(headers.items()) # type: ignore return response
[docs]@blueprint.route(f'{base_url}/log', methods=['GET']) @scoped(scopes.READ_COMPILE, resource=resource_id) def get_log(source_id: str, checksum: str, output_format: str) -> Response: """Get a compilation log.""" resp = controllers.get_log(source_id, checksum, output_format, authorizer(scopes.READ_COMPILE)) data, status_code, headers = resp response: Response = send_file(data['stream'], mimetype=data['content_type'], attachment_filename=data['filename']) return response
[docs]@blueprint.route(f'{base_url}/product', methods=['GET']) @scoped(scopes.READ_COMPILE, resource=resource_id) def get_product(source_id: str, checksum: str, output_format: str) -> Response: """Get a compilation product.""" data, code, head = controllers.get_product(source_id, checksum, output_format, authorizer(scopes.READ_COMPILE)) response: Response = send_file(data['stream'], mimetype=data['content_type'], attachment_filename=data['filename']) response.set_etag(head.get('ETag')) return response