Source code for arxiv.canonical.services.remote
import io
import time
from http import HTTPStatus as status
from typing import Any, Callable, Iterable, IO, Optional, Tuple, Union
from urllib3.util.retry import Retry
import requests
from .. import domain as D
from .. import record as R
from ..register import ICanonicalSource
from .readable import IterReadWrapper, BytesIOProxy
[docs]class RemoteSource(ICanonicalSource):
"""Retrieves content from remote URIs."""
def __init__(self, trusted_domain: str, trusted_scheme: str = 'https',
retry: int = 3, backoff: int = 2, retry_status: int = 5,
force_retry_on: Iterable[status] = (
status.INTERNAL_SERVER_ERROR,
status.BAD_GATEWAY,
status.SERVICE_UNAVAILABLE,
status.GATEWAY_TIMEOUT
)) -> None:
self._trusted_scheme = trusted_scheme
self._trusted_domain = trusted_domain
self._session = requests.Session()
self._adapter = requests.adapters.HTTPAdapter(
max_retries=Retry(
total=retry * 3, # This is just a fallback for odd cases.
read=retry,
connect=retry,
backoff_factor=backoff,
status_forcelist=[code.value for code in force_retry_on],
status=retry_status
)
)
self._session.mount('http://', self._adapter)
self._session.mount('https://', self._adapter)
[docs] def can_resolve(self, uri: D.URI) -> bool:
return self.__can_resolve(uri)
def __can_resolve(self, uri: D.URI) -> bool:
return bool(uri.is_http_url
and uri.netloc == self._trusted_domain
and uri.scheme == self._trusted_scheme)
[docs] def load_entry(self, key: D.URI) -> Tuple[R.RecordStream, str]:
"""Load an entry from the record."""
raise NotImplementedError('Implement me!')
[docs] def load(self, key: D.URI, stream: bool = True) -> IO[bytes]:
"""Make an IO that waits to load from the record until it is read()."""
if not self.__can_resolve(key):
raise RuntimeError(f'Cannot resolve URI: {key}')
return DeferredRequestReader(self._session.get, key, stream=stream)
[docs]class DeferredRequestReader(io.BytesIO):
"""IO[bytes] object that reads lazily via an HTTP request."""
def __init__(self, method: Callable[..., requests.Response],
uri: D.URI, stream: bool = True) -> None:
self._method = method
self._uri = uri
self._stream = stream
self._loaded_reader: Optional[IO[bytes]] = None
@property
def _reader(self) -> IO[bytes]:
if self._loaded_reader is None:
self._loaded_reader = self._get_reader()
return self._loaded_reader
def _get_reader(self) -> IO[bytes]:
response = self._method(str(self._uri), stream=self._stream)
while response.status_code == 200 and 'Refresh' in response.headers:
time.sleep(int(response.headers['Refresh']))
response = self._method(str(self._uri), stream=self._stream)
if response.status_code != 200:
# logger.error('%i: %s', response.status_code, response.headers)
raise IOError(f'Could not retrieve {self._uri}:'
f' {response.status_code}')
return IterReadWrapper(response.iter_content)
[docs] def read(self, size: Optional[int] = -1) -> bytes:
"""Read from the remote resource."""
if size is None:
size = -1
return self._reader.read(size)
[docs] def seek(self, offset: int, whence: int = 0) -> None:
self._reader.seek(offset, whence=whence)
[docs] def seekable(self) -> bool:
self._reader.seekable()
[docs] def tell(self) -> bool:
self._reader.tell()