Source code for arxiv.canonical.record.listing


import datetime
from io import BytesIO
from json import dumps, load
from typing import Type, IO, Iterable, Tuple

from .core import RecordBase, RecordEntry, RecordStream, D, _Self, \
    Year, YearMonth


[docs]class RecordListing(RecordEntry[D.Listing]): """A listing entry."""
[docs] @classmethod def from_domain(cls: Type[_Self], listing: D.Listing) -> _Self: """Serialize a :class:`.Listing`.""" content, size_bytes = RecordListing._encode(listing) key = RecordListing.make_key(listing.identifier) return cls( key=key, stream=RecordStream( domain=D.CanonicalFile( modified=listing.end_datetime, size_bytes=size_bytes, content_type=D.ContentType.json, filename=key.filename, ref=key ), content=content, content_type=D.ContentType.json, size_bytes=size_bytes ), domain=listing )
[docs] @classmethod def from_stream(cls, key: D.Key, stream: RecordStream) -> 'RecordListing': return cls(key=key, stream=stream, domain=cls.to_domain(stream))
[docs] @classmethod def make_key(cls, identifier: D.ListingIdentifier) -> D.Key: prefix = cls.make_prefix(identifier.date) value: str = identifier.date.strftime( f'{prefix}/%Y-%m-%d-{identifier.name}.json' ) return D.Key(value)
[docs] @classmethod def make_prefix(cls, date: datetime.date) -> str: return date.strftime(f'announcement/%Y/%m/%d')
[docs] @classmethod def to_domain(cls, stream: RecordStream) -> D.Listing: assert stream.content is not None listing = D.Listing.from_dict(load(stream.content), ) if stream.content.seekable: stream.content.seek(0) return listing
@classmethod def _encode(cls, listing: D.Listing) -> Tuple[IO[bytes], int]: content = dumps(listing.to_dict()).encode('utf-8') return BytesIO(content), len(content) @property def created(self) -> datetime.datetime: return self.domain.start_datetime @property def name(self) -> str: return 'listing'
[docs]class RecordListingDay(RecordBase[datetime.date, D.ListingIdentifier, RecordListing, D.ListingDay]):
[docs] @classmethod def make_manifest_key(cls, date: datetime.date) -> D.Key: return D.Key(date.strftime('announcement/%Y/%m/%Y-%m-%d.manifest.json'))
[docs]class RecordListingMonth(RecordBase[YearMonth, datetime.date, RecordListing, D.ListingMonth]):
[docs] @classmethod def make_manifest_key(cls, year_and_month: YearMonth) -> D.Key: """ Make a key for a monthly listing manifest. Returns ------- str """ yr, month = year_and_month return D.Key(f'announcement/{yr}/{yr}-{str(month).zfill(2)}.manifest.json')
[docs]class RecordListingYear(RecordBase[Year, YearMonth, RecordListingMonth, D.ListingYear]):
[docs] @classmethod def make_manifest_key(cls, year: Year) -> D.Key: """ Make a key for a yearly listing manifest. Returns ------- str """ return D.Key(f'announcement/{year}.manifest.json')
[docs]class RecordListings(RecordBase[str, Year, RecordListingYear, D.AllListings]):
[docs] @classmethod def make_manifest_key(cls, _: str) -> D.Key: """ Make a key for a root listing manifest. Returns ------- str """ return D.Key('announcement.manifest.json')