Source code for compiler.worker
"""Initialize the Celery application."""
from typing import Any
from base64 import b64decode
import docker
from celery.signals import task_prerun, celeryd_init, worker_init, celeryd_init
import boto3
from arxiv.vault.manager import ConfigManager
from .factory import create_app
from .celery import celery_app
app = create_app()
app.app_context().push()    # type: ignore
if app.config['VAULT_ENABLED']:
    __secrets__ = ConfigManager(app.config)
else:
    __secrets__ = None
[docs]@celeryd_init.connect
def get_secrets(*args: Any, **kwargs: Any) -> None:
    """Collect any required secrets from Vault, and get the convert image."""
    if not app.config['VAULT_ENABLED']:
        print('Vault not enabled; skipping')
        return
    for key, value in __secrets__.yield_secrets():
        app.config[key] = value
    print('updated secrets') 
[docs]@celeryd_init.connect
def verify_converter_image_up_to_date(*args: Any, **kwargs: Any) -> None:
    """Upon startup, pull the compiler image."""
    image = app.config['CONVERTER_DOCKER_IMAGE']
    ecr_registry, _ = image.split('/', 1)
    client = docker.from_env()
    # Get login credentials from AWS for the ECR registry.
    ecr = boto3.client('ecr',
                       region_name=app.config.get('AWS_REGION', 'us-east-1'))
    response = ecr.get_authorization_token()
    token = b64decode(response['authorizationData'][0]['authorizationToken'])
    username, password = token.decode('utf-8').split(':', 1)
    # Log in to the ECR registry with Docker.
    client.login(username, password, registry=ecr_registry)
    client.images.pull(image) 
[docs]@task_prerun.connect
def verify_secrets_up_to_date(*args: Any, **kwargs: Any) -> None:
    """Verify that any required secrets from Vault are up to date."""
    if not app.config['VAULT_ENABLED']:
        print('Vault not enabled; skipping')
        return
    for key, value in __secrets__.yield_secrets():
        app.config[key] = value
    print('updated secrets')