arxiv.integration.kinesis.consumer package¶
Provides base classes and tools for building agents (Kinesis consumers).
We use Kinesis streams to broker notifications that concern multiple services in the arXiv system. For example, at publication time, the publication process generates notifications about new arXiv papers so that services like search can update themselves.
This module provides a base class for building agents with Kinesis streams,
BaseConsumer
. The objective is to provide all of the
stream-management (e.g. checkpointing) and error-handling boilerplate needed
for any Kinesis consumer, so that we can focus on building the idiosyncratic
functional bits in arXiv services.
Using BaseConsumer
¶
You (the developer) should be able to create a minimal agent by:
Defining a class that inherits from
BaseConsumer
Defining an instace method on that class with the signature:
def process_record(self, record: dict) -> None:
that implements application-specific processing for each notification.Calling
process_stream()
with your consumer class and an application config (e.g. from Flask).
Your config should include the following:
KINESIS_STREAM
strName of the stream to consume.
KINESIS_SHARD_ID
strE.g.
"shardId-000000000000"
.AWS_ACCESS_KEY_ID
strAccess key ID with read access to Kinesis streams.
AWS_SECRET_ACCESS_KEY
strSecret access key with read access to Kinesis streams.
AWS_REGION
strE.g.
us-east-1
.KINESIS_ENDPOINT
strThis should be
None
in production, but can be set to something else for integration testing (e.g. with localstack).KINESIS_VERIFY
strThis should be
"true"
in production, but can be disabled when doing integration testing (since localstack certs won’t verify).KINESIS_START_TYPE
: strHow to get the first shard iterator (if a starting position is not available via the checkpointer). Currently supports
TRIM_HORIZON
andAT_TIMESTAMP
.KINESIS_START_AT
strIf using
AT_TIMESTAMP
, the point of time from which to start in the stream. Should have the format'%Y-%m-%dT%H:%M:%S'
.
If you’re using the provided DiskCheckpointManager
provided here
(used by default in process_stream()
), you should also set:
KINESIS_CHECKPOINT_VOLUME
strFull path to a directory where the consumer should store its position. Must be readable/writeable.
Testing and development¶
The easiest way to write tests for Kinesis consumers is to mock the
[boto3.client
factory](http://boto3.readthedocs.io/en/latest/reference/services/kinesis.html).
Unit tests for this module can be found in arxiv.base.agent.tests
,
most of which mock boto3 in this way.
For integration tests, or developing against a “live” Kinesis stream, [Localstack](https://github.com/localstack/localstack) provides a Kinesis for testing/development purposes (port 4568). You can use the config parameters above to point to a local instance of localstack (e.g. run with Docker).
-
class
arxiv.integration.kinesis.consumer.
BaseConsumer
(stream_name='', shard_id='', access_key=None, secret_key=None, region='', checkpointer=None, back_off=5, batch_size=50, endpoint=None, verify=True, duration=None, start_type='AT_TIMESTAMP', start_at='2019-10-07T19:29:32', tries=5, delay=5, max_delay=None, backoff=1, jitter=0, **extra)[source]¶ Bases:
object
Kinesis stream consumer.
Consumes a single shard from a single stream, and checkpoints on disk (to reduce external dependencies).
-
get_or_create_stream
()[source]¶ Wait for the stream, and create it if it doesn’t exist.
- Return type
None
-
get_records
(iterator, limit, tries=5, delay=5, max_delay=None, backoff=1, jitter=0)[source]¶ Get the next batch of
limit
or fewer records.
-
process_record
(record)[source]¶ Process a single record from the stream.
- Parameters
record (dict) –
- Return type
None
-
wait_for_stream
(tries=5, delay=5, max_delay=None, backoff=2, jitter=0)[source]¶ Wait for the stream to become available.
If the stream becomes available, returns
None
. Otherwise, raises aStreamNotAvailable
exception.- Raises
StreamNotAvailable – Raised when the stream could not be reached.
- Return type
None
-
-
class
arxiv.integration.kinesis.consumer.
DiskCheckpointManager
(base_path, stream_name, shard_id)[source]¶ Bases:
object
Provides on-disk loading and updating of consumer checkpoints.
You can substitute any other mechanism that you prefer for checkpointing when you instantiate your consumer, so long as the passed object:
Has an instance method
checkpoint(self, position: str) -> None:
that storesposition
, andExposes a property
.position
that is the last value passed tocheckpoint
.
-
arxiv.integration.kinesis.consumer.
process_stream
(Consumer, config, checkpointmanager=None, duration=None, extra={})[source]¶ Configure and run an agent (Kinesis consumer).
- Parameters
Consumer (type) – A class that inherits from
BaseConsumer
.config (dict) – An application config (e.g. a Flask config).
duration (int) – Time (in seconds) to consume records. If None (default), will run “forever”.
extra (kwargs) – Extra keyword arguments passed to the Consumer constructor.
- Return type
None