agent.consumer module

Submission event consumer.

The submission event consumer is responsible for monitoring submission events, evaluating them against pre-defined rules, and triggering processes to be carried out by the agent.worker.

The consumer is implemented using arxiv.integration.kinesis, and consumes events in the SubmissionEvents Kinesis stream. Events may be generated by submission user interfaces, APIs, and backend components that leverage the arxiv.submission core package. As events are consumed, they are evaluated against a set of registered Rule instances, which map event types and conditions to ProcessType classes.

Consequent processes are not run in the consumer application, which is run as a single thread. The consumer tries to move on as quickly as possible, so it uses the AsyncProcessRunner to dispatch processes for parallel execution by the worker.

The event lifecycle from the perspective of the consumer looks like this:

  1. A command/event is generated by a submission service, using the arxiv.submission package. The event is stored in the database, and propagated via the SubmissionEvents Kinesis stream. The Kinesis payload includes the event itself, and the state of the submission both before and after the event was applied.

  2. The event is consumed by the agent via the SubmissionEvents Kinesis stream.

  3. The agent evaluates the event against registered Rule instances, using rules.evaluate(). A Rule maps a condition (the event type and event/submission properties) to a Process.

  4. The agent dispatches any triggered Proccess instances to the agent.worker using the AsyncProcessRunner.

Components

../../_images/submission-agent-consumer-components.png

Fig. 8 Main components of the event consumer.

The SubmissionEventConsumer defines how records from the SubmissionEvents stream are handled. This is the primary point of control in the agent. As events are received, it relies on agent.rules to determine what processes to carry out, and then dispatches those processes to the agent.worker using the AsyncProcessRunner.

The SubmissionEventConsumer relies on the DatabaseCheckpointManager to keep track of its progress in the SubmissionEvents stream.

The agent.services.database integration module provides access to the agent database. Specifically, it supports creating and loading checkpoints, and storing information about process-relevant events.

Processes are defined in agent.process. Each process is a subclass of Process, and may have one or more steps.

Rules are defined in agent.rules. Each rule is an instantiation of Rule in the root of that module. It relies on the event types defined in arxiv.submission.domain.events, and the processes defined in agent.process.

agent.runner provides tools for running Process instances. The base ProcessRunner carries out the process in a single thread, which may be useful for testing purposes. The runner used in production is the AsyncProcessRunner, which manages registration and dispatching of asynchronous tasks carried out by the agent.worker.

class agent.consumer.DatabaseCheckpointManager(shard_id)

Bases: object

Provides db-backed loading and updating of consumer checkpoints.

checkpoint(position)

Checkpoint at position.

Return type

None

class agent.consumer.SubmissionEventConsumer(*args, config={}, **kwargs)

Bases: arxiv.integration.kinesis.consumer.BaseConsumer

Consumes submission events, and dispatches processes based on rules.

new_client()

Generate a new Kinesis client.

Return type

<function client at 0x104ded950>

process_record(record)

Evaluate an event against registered rules.

Parameters
  • data (bytes) –

  • partition_key (bytes) –

  • sequence_number (int) –

  • sub_sequence_number (int) –

Return type

None

process_records(start)

Update secrets before getting a new batch of records.

Return type

Tuple[str, int]

sleep = 0.2
sleep_after_credentials = 10
update_secrets()

Update any secrets that are out of date.

Return type

bool

wait_for_stream(tries=5, delay=5, max_delay=None, backoff=2, jitter=0)

Wait for the stream to become available.

If the stream becomes available, returns None. Otherwise, raises a StreamNotAvailable exception.

Raises

StreamNotAvailable – Raised when the stream could not be reached.

Return type

None

agent.consumer.process_stream(app, duration=None)

Configure and run the record processor.

Parameters

duration (int) – Time (in seconds) to run record processing. If None (default), will run “forever”.

Return type

None

agent.consumer.start_agent()

Start the record processor.

Return type

None