agent.services.database module

Lightweight database integration for checkpointing.

class agent.services.database.Checkpoint(**kwargs)

Bases: sqlalchemy.ext.declarative.api.Model

Stores checkpoint information for the Kinesis consumer.

created
id
position
shard_id
class agent.services.database.ProcessStatusEvent(**kwargs)

Bases: sqlalchemy.ext.declarative.api.Model

Stores events related to processes.

agent_id
agent_type
created
event_id
id
process
process_id
reason
received
status
submission_id
exception agent.services.database.Unavailable

Bases: OSError

The database is not available.

agent.services.database.await_connection(max_wait=-1)

Wait for the database to be available.

Return type

None

agent.services.database.create_all()

Create all tables in the agent database.

Return type

None

agent.services.database.get_latest_position(shard_id)

Get the latest checkpointed position.

Return type

str

agent.services.database.init_app(app)

Set configuration defaults and attach session to the application.

Return type

None

agent.services.database.is_available(**kwargs)

Check our connection to the database.

Return type

bool

agent.services.database.store_event(event)

Store an AddProcessStatus event.

Return type

None

agent.services.database.store_position(position, shard_id)

Store a new checkpoint position.

Return type

None

agent.services.database.tables_exist()

Determine whether or not these database tables exist.

Return type

bool