Source code for compiler.worker
"""Initialize the Celery application."""
from typing import Any
from base64 import b64decode
import docker
from celery.signals import task_prerun, celeryd_init, worker_init, celeryd_init
import boto3
from arxiv.vault.manager import ConfigManager
from .factory import create_app
from .celery import celery_app
app = create_app()
app.app_context().push() # type: ignore
if app.config['VAULT_ENABLED']:
__secrets__ = ConfigManager(app.config)
else:
__secrets__ = None
[docs]@celeryd_init.connect
def get_secrets(*args: Any, **kwargs: Any) -> None:
"""Collect any required secrets from Vault, and get the convert image."""
if not app.config['VAULT_ENABLED']:
print('Vault not enabled; skipping')
return
for key, value in __secrets__.yield_secrets():
app.config[key] = value
print('updated secrets')
[docs]@celeryd_init.connect
def verify_converter_image_up_to_date(*args: Any, **kwargs: Any) -> None:
"""Upon startup, pull the compiler image."""
image = app.config['CONVERTER_DOCKER_IMAGE']
ecr_registry, _ = image.split('/', 1)
client = docker.from_env()
# Get login credentials from AWS for the ECR registry.
ecr = boto3.client('ecr',
region_name=app.config.get('AWS_REGION', 'us-east-1'))
response = ecr.get_authorization_token()
token = b64decode(response['authorizationData'][0]['authorizationToken'])
username, password = token.decode('utf-8').split(':', 1)
# Log in to the ECR registry with Docker.
client.login(username, password, registry=ecr_registry)
client.images.pull(image)
[docs]@task_prerun.connect
def verify_secrets_up_to_date(*args: Any, **kwargs: Any) -> None:
"""Verify that any required secrets from Vault are up to date."""
if not app.config['VAULT_ENABLED']:
print('Vault not enabled; skipping')
return
for key, value in __secrets__.yield_secrets():
app.config[key] = value
print('updated secrets')