agent.consumer module¶
Submission event consumer.
The submission event consumer is responsible for monitoring submission events,
evaluating them against pre-defined rules, and triggering processes to be
carried out by the agent.worker
.
The consumer is implemented using arxiv.integration.kinesis
, and
consumes events in the SubmissionEvents
Kinesis stream. Events may
be generated by submission user interfaces, APIs, and backend components
that leverage the arxiv.submission
core package. As events are consumed,
they are evaluated against a set of registered Rule
instances, which
map event types and conditions to ProcessType
classes.
Consequent processes are not run in the consumer application, which is run as a
single thread. The consumer tries to move on as quickly as possible, so it uses
the AsyncProcessRunner
to dispatch processes for parallel execution
by the worker.
The event lifecycle from the perspective of the consumer looks like this:
A command/event is generated by a submission service, using the
arxiv.submission
package. The event is stored in the database, and propagated via theSubmissionEvents
Kinesis stream. The Kinesis payload includes the event itself, and the state of the submission both before and after the event was applied.The event is consumed by the agent via the
SubmissionEvents
Kinesis stream.The agent evaluates the event against registered
Rule
instances, usingrules.evaluate()
. ARule
maps a condition (the event type and event/submission properties) to aProcess
.The agent dispatches any triggered
Proccess
instances to theagent.worker
using theAsyncProcessRunner
.
Components¶
The SubmissionEventConsumer
defines how records from the
SubmissionEvents
stream are handled. This is the primary point of control
in the agent. As events are received, it relies on agent.rules
to
determine what processes to carry out, and then dispatches those processes
to the agent.worker
using the AsyncProcessRunner
.
The SubmissionEventConsumer
relies on the
DatabaseCheckpointManager
to keep track of its progress in the
SubmissionEvents
stream.
The agent.services.database
integration module provides access to the
agent database. Specifically, it supports creating and loading checkpoints,
and storing information about process-relevant events.
Processes are defined in agent.process
. Each process is a subclass of
Process
, and may have one or more steps.
Rules are defined in agent.rules
. Each rule is an instantiation of
Rule
in the root of that module. It relies on the event types
defined in arxiv.submission.domain.events
, and the processes defined in
agent.process
.
agent.runner
provides tools for running Process
instances. The
base ProcessRunner
carries out the process in a single thread, which
may be useful for testing purposes. The runner used in production is the
AsyncProcessRunner
, which manages registration and dispatching of
asynchronous tasks carried out by the agent.worker
.
-
class
agent.consumer.
DatabaseCheckpointManager
(shard_id)¶ Bases:
object
Provides db-backed loading and updating of consumer checkpoints.
-
checkpoint
(position)¶ Checkpoint at
position
.- Return type
None
-
-
class
agent.consumer.
SubmissionEventConsumer
(*args, config={}, **kwargs)¶ Bases:
arxiv.integration.kinesis.consumer.BaseConsumer
Consumes submission events, and dispatches processes based on rules.
-
new_client
()¶ Generate a new Kinesis client.
- Return type
<function client at 0x104ded950>
-
process_record
(record)¶ Evaluate an event against registered rules.
-
process_records
(start)¶ Update secrets before getting a new batch of records.
-
sleep
= 0.2¶
-
sleep_after_credentials
= 10¶
-
wait_for_stream
(tries=5, delay=5, max_delay=None, backoff=2, jitter=0)¶ Wait for the stream to become available.
If the stream becomes available, returns
None
. Otherwise, raises aStreamNotAvailable
exception.- Raises
StreamNotAvailable – Raised when the stream could not be reached.
- Return type
None
-
-
agent.consumer.
process_stream
(app, duration=None)¶ Configure and run the record processor.
- Parameters
duration (int) – Time (in seconds) to run record processing. If None (default), will run “forever”.
- Return type
None
-
agent.consumer.
start_agent
()¶ Start the record processor.
- Return type
None