arxiv.integration.kinesis.consumer package

Provides base classes and tools for building agents (Kinesis consumers).

We use Kinesis streams to broker notifications that concern multiple services in the arXiv system. For example, at publication time, the publication process generates notifications about new arXiv papers so that services like search can update themselves.

This module provides a base class for building agents with Kinesis streams, BaseConsumer. The objective is to provide all of the stream-management (e.g. checkpointing) and error-handling boilerplate needed for any Kinesis consumer, so that we can focus on building the idiosyncratic functional bits in arXiv services.

Using BaseConsumer

You (the developer) should be able to create a minimal agent by:

  1. Defining a class that inherits from BaseConsumer

  2. Defining an instace method on that class with the signature: def process_record(self, record: dict) -> None: that implements application-specific processing for each notification.

  3. Calling process_stream() with your consumer class and an application config (e.g. from Flask).

Your config should include the following:

KINESIS_STREAMstr

Name of the stream to consume.

KINESIS_SHARD_IDstr

E.g. "shardId-000000000000".

AWS_ACCESS_KEY_IDstr

Access key ID with read access to Kinesis streams.

AWS_SECRET_ACCESS_KEYstr

Secret access key with read access to Kinesis streams.

AWS_REGIONstr

E.g. us-east-1.

KINESIS_ENDPOINTstr

This should be None in production, but can be set to something else for integration testing (e.g. with localstack).

KINESIS_VERIFYstr

This should be "true" in production, but can be disabled when doing integration testing (since localstack certs won’t verify).

KINESIS_START_TYPE: str

How to get the first shard iterator (if a starting position is not available via the checkpointer). Currently supports TRIM_HORIZON and AT_TIMESTAMP.

KINESIS_START_ATstr

If using AT_TIMESTAMP, the point of time from which to start in the stream. Should have the format '%Y-%m-%dT%H:%M:%S'.

If you’re using the provided DiskCheckpointManager provided here (used by default in process_stream()), you should also set:

KINESIS_CHECKPOINT_VOLUMEstr

Full path to a directory where the consumer should store its position. Must be readable/writeable.

Testing and development

The easiest way to write tests for Kinesis consumers is to mock the [boto3.client factory](http://boto3.readthedocs.io/en/latest/reference/services/kinesis.html). Unit tests for this module can be found in arxiv.base.agent.tests, most of which mock boto3 in this way.

For integration tests, or developing against a “live” Kinesis stream, [Localstack](https://github.com/localstack/localstack) provides a Kinesis for testing/development purposes (port 4568). You can use the config parameters above to point to a local instance of localstack (e.g. run with Docker).

class arxiv.integration.kinesis.consumer.BaseConsumer(stream_name='', shard_id='', access_key=None, secret_key=None, region='', checkpointer=None, back_off=5, batch_size=50, endpoint=None, verify=True, duration=None, start_type='AT_TIMESTAMP', start_at='2019-10-07T19:29:32', tries=5, delay=5, max_delay=None, backoff=1, jitter=0, **extra)[source]

Bases: object

Kinesis stream consumer.

Consumes a single shard from a single stream, and checkpoints on disk (to reduce external dependencies).

get_or_create_stream()[source]

Wait for the stream, and create it if it doesn’t exist.

Return type

None

get_records(iterator, limit, tries=5, delay=5, max_delay=None, backoff=1, jitter=0)[source]

Get the next batch of limit or fewer records.

Return type

Tuple[str, dict]

go()[source]

Run the main processing routine.

Return type

None

new_client()[source]

Generate a new Kinesis client.

Return type

<function client at 0x105077620>

process_record(record)[source]

Process a single record from the stream.

Parameters

record (dict) –

Return type

None

process_records(start)[source]

Retrieve and process records starting at start.

Return type

Tuple[str, int]

stop(signal, frame)[source]

Set exit flag for a graceful stop.

Return type

None

wait_for_stream(tries=5, delay=5, max_delay=None, backoff=2, jitter=0)[source]

Wait for the stream to become available.

If the stream becomes available, returns None. Otherwise, raises a StreamNotAvailable exception.

Raises

StreamNotAvailable – Raised when the stream could not be reached.

Return type

None

class arxiv.integration.kinesis.consumer.DiskCheckpointManager(base_path, stream_name, shard_id)[source]

Bases: object

Provides on-disk loading and updating of consumer checkpoints.

You can substitute any other mechanism that you prefer for checkpointing when you instantiate your consumer, so long as the passed object:

  1. Has an instance method checkpoint(self, position: str) -> None: that stores position, and

  2. Exposes a property .position that is the last value passed to checkpoint.

checkpoint(position)[source]

Checkpoint at position.

Return type

None

arxiv.integration.kinesis.consumer.process_stream(Consumer, config, checkpointmanager=None, duration=None, extra={})[source]

Configure and run an agent (Kinesis consumer).

Parameters
  • Consumer (type) – A class that inherits from BaseConsumer.

  • config (dict) – An application config (e.g. a Flask config).

  • duration (int) – Time (in seconds) to consume records. If None (default), will run “forever”.

  • extra (kwargs) – Extra keyword arguments passed to the Consumer constructor.

Return type

None