arxiv.base.agent.tests module

Tests for BaseConsumer.

class arxiv.base.agent.tests.TestBaseConsumer(methodName='runTest')[source]

Bases: unittest.case.TestCase

Test BaseConsumer behavior and public methods.

setUp()[source]

Create a mock checkpointer.

test_init(mock_client_factory)[source]

On init, consumer should wait for stream to be available.

test_init_stream_not_available(mock_client_factory)[source]

If the stream is not available, should raise an exception.

test_iteration(mock_client_factory)[source]

Test iteration behavior.

test_process_records_until_shard_closes(mock_client_factory)[source]

Should call GetRecords until no next iterator is available.

test_process_records_with_clienterror(mock_client_factory)[source]

Should try to checkpoint before exiting.

test_start_from_position(mock_client_factory)[source]

Consumer is initialized with start_type ‘AT_TIMESTAMP’.

test_start_from_timestamp(mock_client_factory)[source]

Consumer is initialized with start_type ‘AT_TIMESTAMP’.

test_start_from_trim_horizon(mock_client_factory)[source]

Consumer is initialized with start_type ‘AT_TIMESTAMP’.

class arxiv.base.agent.tests.TestProcessStream(methodName='runTest')[source]

Bases: unittest.case.TestCase

Tests for process_stream().

setUp()[source]

Define a testing config.

test_process_stream(mock_client_factory)[source]

Run process_stream() with a vanilla config.