Source code for arxiv.canonical.core

"""Core interfaces for the canonical record."""

import io
import datetime
from typing import Any, Callable, Dict, IO, Iterable, List, Sequence, Tuple, Type, \
    TypeVar, Union

from typing_extensions import Protocol

from . import domain as D
from . import integrity as I
from . import record as R
from .manifest import Manifest


[docs]class IEventStream(Protocol): """Interface for the canonical event stream."""
[docs] def emit(self, event: D.Event) -> None: """ Emit an :class:`Event` on the stream. Parameters ---------- event : :class:`Event` """
[docs] def listen(self, on_event: Callable[[D.Event], None]) -> None: """ Listen for :class:`Event`s on the stream. Parameters ---------- on_event : callable This object will be called for every event that is received. It should accept a single argument, an :class:`.Event`, and is expected to return ``None``. """
[docs]class ICanonicalSource(Protocol): """Interface for source services, used to dereference URIs."""
[docs] def can_resolve(self, uri: D.URI) -> bool: """ Indicate whether or not the implementation can resolve an URI. Parameters ---------- uri : :class:`.D.URI` Returns ------- bool """
[docs] def load(self, key: D.URI) -> IO[bytes]: # pylint: disable=unused-argument; this is a stub. """ Make an IO that waits to load from the record until it is read(). Parameters ---------- key : :class:`D.URI` Returns ------- IO Yields bytes when read. This may be a lazy IO object, so that reading is deferred until the latest possible time. """
[docs]class IStorableEntry(Protocol): """ Minimal interface for a bitstream interface that can be stored. Services that implement :class:`.ICanonicalStorage` can assume that the attributes of this interface are available on objects passed for storing. """ name: str """Name of the entry.""" # pylint: disable=pointless-string-statement; this is a docstring. @property def checksum(self) -> str: """URL-safe b64-encoded md5 hash.""" @property def record(self) -> R.RecordEntry: """Reference to a :class:`.RecordEntry`."""
[docs] def update_checksum(self) -> None: """Update the integrity checksum for this entry."""
[docs]class IManifestStorage(Protocol): """ Manifest protocol. This could conceivably be stored separately from the canonical record content, so it is defined separately. """
[docs] def store_manifest(self, key: D.Key, manifest: Manifest) -> None: # pylint: disable=unused-argument; this is a stub. """ Store an integrity manifest. Parameters ---------- key : :class:`.Key` Key used to identify ``manifest`` in storage. manifest : :class:`.Manifest` The manifest record to store. """
[docs] def load_manifest(self, key: D.Key) -> Manifest: # pylint: disable=unused-argument; this is a stub. """ Load an integrity manifest. Parameters ---------- key : :class:`.Key` Key used to identify ``manifest`` in storage. Returns ------- :class:`.Manifest` """
_I = TypeVar('_I', I.IntegrityEntry, I.IntegrityMetadata, I.IntegrityListing, covariant=True)
[docs]class ICanonicalStorage(ICanonicalSource, IManifestStorage, Protocol): """Interface for services that store the canonical record."""
[docs] def list_subkeys(self, key: D.URI) -> List[str]: # pylint: disable=unused-argument; this is a stub. """ List all of the subkeys (direct descendants) of ``key`` in the record. Parameters ---------- key : :class:`.URI` Returns ------- list Items are the relative names of the descendants of ``key``. For filesystem-based storage, this may be equivalent to ``os.listdir``. """
[docs] def store_entry(self, ri: IStorableEntry) -> None: # pylint: disable=unused-argument; this is a stub. """ Store a bitstream entry in the record. This method MUST decompress the content of the entry if it is gzipped (as is sometimes the case in the classic system) and update the ``CanonicalFile`` (``ri.record.stream.domain``). Parameters ---------- ri : :class:`.IStorableEntry` A storable bitstream. """
[docs] def load_entry(self, key: D.URI) -> Tuple[R.RecordStream, str]: """ Load a bitstream entry. Parameters ---------- key : :class:`.URI` Key that identifies the bitsream in the record. Returns ------- :class:`.RecordStream` The bitstream resource. str Checksum of the bitstream (URL-safe base64-encoded md5 hash).ß """
Year = int """Years are represented as four-digit integers.""" Month = int """Months are represented as integers.""" YearMonth = Tuple[Year, Month] """A month in a particular year is represented as a 2-tuple of integers.""" Selector = Union[Year, YearMonth, datetime.date] """A selector can refer to a year, month, or a specific date.""" _ID = Union[D.VersionedIdentifier, D.Identifier]
[docs]class IRegisterAPI(Protocol): """Interface for the canonical register API."""
[docs] def add_events(self, *events: D.Event) -> None: """Add new events to the register."""
[docs] def load_version(self, identifier: D.VersionedIdentifier) -> D.Version: # pylint: disable=unused-argument; this is a stub. """Load an e-print :class:`.Version` from the record."""
[docs] def load_eprint(self, identifier: D.Identifier) -> D.EPrint: # pylint: disable=unused-argument; this is a stub. """Load an :class:`.EPrint` from the record."""
[docs] def load_history(self, identifier: _ID) -> Iterable[D.EventSummary]: # pylint: disable=unused-argument; this is a stub. """Load the event history of an :class:`.EPrint`."""
[docs] def load_event(self, identifier: str) -> D.Event: # pylint: disable=unused-argument; this is a stub. """Load an :class:`.Event` by identifier."""
[docs] def load_events(self, selector: Selector) -> Tuple[Iterable[D.Event], int]: # pylint: disable=unused-argument; this is a stub. """Load all :class:`.Event`s for a day, month, or year."""
[docs] def load_listing(self, date: datetime.date, # pylint: disable=unused-argument; this is a stub. shard: str = D.Event.get_default_shard()) -> D.Listing: # pylint: disable=no-member, unused-argument """Load a :class:`.Listing` for a particulate date."""
# TODO: implement me!
[docs]class IPreservationAPI(Protocol): """Interface for the daily preservation record API."""
[docs] def add_events(self, *events: D.Event) -> None: """Add new events to the preservation record."""
[docs] def load_package(self, date: datetime.date) -> I.IntegrityEntry: """Load the preservation package for a particular date."""
# TODO: consider a semantically more meaningful exception for failure to # dereference the URI.
[docs]def dereference(sources: Sequence[ICanonicalSource], uri: D.URI) -> IO[bytes]: """ Dereference an URI using a set of available sources. Sources are checked one at a time for ability to resolve the URI. When one is found, the URI is loaded. Parameters ---------- sources : sequence Items are content sources that should conform to :class:`.ICanonicalSource`. They will be tried in the order provided. uri : :class:`.URI` URI to dereference. Returns ------- io BytesIO object. Raises ------ :class:`RuntimeError` Raised when the URI cannot be resolved. """ for source in sources: if source.can_resolve(uri): return source.load(uri) raise RuntimeError(f'Cannot resolve URI: {uri}')