Source code for arxiv.submission.services.stream.stream

from typing import Optional

import boto3
from botocore.exceptions import ClientError
from retry import retry

from arxiv.integration.meta import MetaIntegration
from arxiv.base import logging
from arxiv.base.globals import get_application_config, get_application_global

from ...domain import Submission, Event
from ...serializer import dumps

logger = logging.getLogger(__name__)


[docs]class StreamPublisher(metaclass=MetaIntegration): def __init__(self, stream: str, partition_key: str, aws_access_key_id: str, aws_secret_access_key: str, region_name: str, endpoint_url: Optional[str] = None, verify: bool = True) -> None: self.stream = stream self.partition_key = partition_key self.client = boto3.client('kinesis', region_name=region_name, endpoint_url=endpoint_url, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, verify=verify)
[docs] @classmethod def init_app(cls, app: object = None) -> None: """Set default configuration params for an application instance.""" config = get_application_config(app) config.setdefault('AWS_ACCESS_KEY_ID', '') config.setdefault('AWS_SECRET_ACCESS_KEY', '') config.setdefault('AWS_REGION', 'us-east-1') config.setdefault('KINESIS_ENDPOINT', None) config.setdefault('KINESIS_VERIFY', True) config.setdefault('KINESIS_STREAM', 'SubmissionEvents') config.setdefault('KINESIS_PARTITION_KEY', '0')
[docs] @classmethod def get_session(cls, app: object = None) -> 'StreamPublisher': """Get a new session with the stream.""" config = get_application_config(app) aws_access_key_id = config['AWS_ACCESS_KEY_ID'] aws_secret_access_key = config['AWS_SECRET_ACCESS_KEY'] aws_region = config['AWS_REGION'] kinesis_endpoint = config['KINESIS_ENDPOINT'] kinesis_verify = config['KINESIS_VERIFY'] kinesis_stream = config['KINESIS_STREAM'] partition_key = config['KINESIS_PARTITION_KEY'] return cls(kinesis_stream, partition_key, aws_access_key_id, aws_secret_access_key, aws_region, kinesis_endpoint, kinesis_verify)
[docs] @classmethod def current_session(cls) -> 'StreamPublisher': """Get/create :class:`.StreamPublisher` for this context.""" g = get_application_global() if not g: return cls.get_session() elif 'stream' not in g: g.stream = cls.get_session() # type: ignore return g.stream # type: ignore
[docs] def is_available(self, **kwargs) -> bool: """Test our ability to put records.""" data = bytes(dumps({}), encoding='utf-8') try: self.client.put_record(StreamName=self.stream, Data=data, PartitionKey=self.partition_key) except Exception as e: logger.error('Encountered error while putting to stream: %s', e) return False return True
def _create_stream(self) -> None: try: self.client.create_stream(StreamName=self.stream, ShardCount=1) except self.client.exceptions.ResourceInUseException: logger.info('Stream %s already exists', self.stream) return def _wait_for_stream(self, retries: int = 0, delay: int = 0) -> None: waiter = self.client.get_waiter('stream_exists') waiter.wait( StreamName=self.stream, WaiterConfig={ 'Delay': delay, 'MaxAttempts': retries } )
[docs] @retry(RuntimeError, tries=5, delay=2, backoff=2) def initialize(self) -> None: """Perform initial checks, e.g. at application start-up.""" logger.info('initialize Kinesis stream') data = bytes(dumps({}), encoding='utf-8') try: self.client.put_record(StreamName=self.stream, Data=data, PartitionKey=self.partition_key) logger.info('storage service is already available') except ClientError as exc: if exc.response['Error']['Code'] == 'ResourceNotFoundException': logger.info('stream does not exist; creating') self._create_stream() logger.info('wait for stream to be available') self._wait_for_stream(retries=10, delay=5) raise RuntimeError('Failed to initialize stream') from exc except self.client.exceptions.ResourceNotFoundException: logger.info('stream does not exist; creating') self._create_stream() logger.info('wait for stream to be available') self._wait_for_stream(retries=10, delay=5) except Exception as exc: raise RuntimeError('Failed to initialize stream') from exc return
[docs] def put(self, event: Event, before: Submission, after: Submission) -> None: """Put an :class:`.Event` on the stream.""" payload = {'event': event, 'before': before, 'after': after} data = bytes(dumps(payload), encoding='utf-8') self.client.put_record(StreamName=self.stream, Data=data, PartitionKey=self.partition_key)