Source code for search.agent.tests.test_integration

"""Integration tests for :mod:`search.agent` with Kinesis."""

from unittest import TestCase, mock
import os
import time
import subprocess
import tempfile
import boto3
import json
import threading

from search.agent import process_stream
from arxiv.base.agent import StopProcessing
from search.services import metadata
from search.domain import DocMeta
from search.factory import create_ui_web_app

BASE_PATH = os.path.join(os.path.split(os.path.abspath(__file__))[0],
                         '../../../tests/data/examples')


[docs]class TestKinesisIntegration(TestCase): """Test :class:`.MetadataRecordProcessor` with a live Kinesis stream.""" __test__ = int(bool(os.environ.get('WITH_INTEGRATION', False)))
[docs] @classmethod def setUpClass(cls): """Spin up ES and index documents.""" os.environ['ELASTICSEARCH_SERVICE_HOST'] = 'localhost' os.environ['ELASTICSEARCH_SERVICE_PORT'] = "9201" os.environ['ELASTICSEARCH_PORT_9201_PROTO'] = "http" os.environ['ELASTICSEARCH_VERIFY'] = 'false' os.environ['KINESIS_STREAM'] = 'MetadataIsAvailable' os.environ['KINESIS_SHARD_ID'] = '0' os.environ['KINESIS_CHECKPOINT_VOLUME'] = tempfile.mkdtemp() os.environ['KINESIS_ENDPOINT'] = 'http://127.0.0.1:6568' os.environ['KINESIS_VERIFY'] = 'false' os.environ['KINESIS_START_TYPE'] = 'TRIM_HORIZON' print('pulling localstack image') pull_localstack = subprocess.run( "docker pull atlassianlabs/localstack", stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True ) print('starting localstack') start_localstack = subprocess.run( "docker run -d -p 6568:4568 --name ltest atlassianlabs/localstack", stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) if start_localstack.returncode != 0: raise RuntimeError( f'Could not start localstack: {start_localstack.stdout}.' f' Is one already running? Is port 6568 available?' ) cls.ls_container = start_localstack.stdout.decode('ascii').strip() print(f'localstack started as {cls.ls_container}') cls.client = boto3.client( 'kinesis', region_name='us-east-1', endpoint_url="http://localhost:6568", aws_access_key_id='foo', aws_secret_access_key='bar', verify=False ) print('creating stream ahead of time, to populate with records') cls.client.create_stream( StreamName='MetadataIsAvailable', ShardCount=1 ) time.sleep(5) print('created stream, ready to test') cls.app = create_ui_web_app()
[docs] @classmethod def tearDownClass(cls): """Tear down Elasticsearch once all tests have run.""" stop_es = subprocess.run(f"docker rm -f {cls.ls_container}", stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
[docs] @mock.patch('search.agent.consumer.index') @mock.patch('search.agent.consumer.metadata') def test_process(self, mock_metadata, mock_index): """Add some records to the stream, and run processing loop for 5s.""" to_index = [ "1712.04442", # flux capacitor "1511.07473", # flux capacitor "1604.04228", # flux capacitor "1403.6219", # λ "1404.3450", # $z_1$ "1703.09067", # $\lambda$ "1408.6682", # $\lambda$ "1607.05107", # Schröder "1509.08727", # Schroder "1710.01597", # Schroeder "1708.07156", # w w "1401.1012", # Wonmin Son ] for document_id in to_index: data = bytes( json.dumps({'document_id': document_id}), encoding='utf-8' ) self.client.put_record( StreamName='MetadataIsAvailable', Data=data, PartitionKey='0' ) def retrieve(document_id): with open(os.path.join(BASE_PATH, f'{document_id}.json')) as f: return DocMeta(**json.load(f)) mock_metadata.retrieve.side_effect = retrieve # Preserve exceptions mock_metadata.RequestFailed = metadata.RequestFailed mock_metadata.SecurityException = metadata.SecurityException mock_metadata.ConnectionFailed = metadata.ConnectionFailed mock_metadata.BadResponse = metadata.BadResponse with self.app.app_context(): try: process_stream(duration=30) except StopProcessing: pass self.assertGreater(mock_metadata.bulk_retrieve.call_count, 0)