Source code for arxiv.canonical.services.repository

from http import HTTPStatus as status
from typing import IO, Iterable, Tuple, Union
from urllib.parse import urljoin
from urllib3.util.retry import Retry

import requests

from .. import domain as D
from .. import record as R
from ..register import ICanonicalSource
from .remote import RemoteSource


[docs]class RemoteRepository(RemoteSource): """Retrieves content from a remote arXiv repository."""
[docs] def can_resolve(self, uri: D.URI) -> bool: return self.__can_resolve(uri)
def __can_resolve(self, uri: D.URI) -> bool: return uri.is_canonical def _to_http(self, uri: D.URI) -> D.URI: """Make an HTTP URI from an arXiv canonical URI.""" return D.URI(urljoin( f'{self._trusted_scheme}://{self._trusted_domain}', uri.path ))
[docs] def load_entry(self, key: D.URI) -> Tuple[R.RecordStream, str]: """Load an entry from the record.""" raise NotImplementedError('Implement me!')
[docs] def load(self, key: D.URI, stream: bool = True) -> IO[bytes]: """Make an IO that waits to load from the record until it is read().""" return super(RemoteRepository, self).load(self._to_http(key), stream=stream)