Source code for arxiv.canonical.services.store

"""
Persist changes to the canonical record.

Provides a :class:`.CanonicalStore` that stores resources in S3, using
:mod:`.serialize.record` to serialize and deserialize resources.
"""
import gzip
import io
import logging
import os
from base64 import urlsafe_b64decode, urlsafe_b64encode
from binascii import hexlify, unhexlify
from datetime import datetime, date
from functools import partial
from hashlib import md5
from json import dumps, load
from typing import (Optional, Dict, Any, IO, List, Callable, Tuple, Type,
                    TypeVar, Union)

import boto3
import botocore
from backports.datetime_fromisoformat import MonkeyPatch
from botocore.config import Config
from botocore.exceptions import ClientError
from pytz import UTC

from .. import integrity as I
from .. import record as R
from .. import domain as D
from ..manifest import (Manifest, ManifestDecoder, ManifestEntry,
                        ManifestEncoder)
from ..register import ICanonicalStorage, IStorableEntry
from ..serialize.decoder import CanonicalDecoder
from ..serialize.encoder import CanonicalEncoder
from .readable import BytesIOProxy


MonkeyPatch.patch_fromisoformat()

logger = logging.getLogger(__name__)
logger.setLevel(int(os.environ.get('LOGLEVEL', '40')))

_I = TypeVar('_I', I.IntegrityEntry, I.IntegrityMetadata, I.IntegrityListing,
             covariant=True)

Checksum = str


[docs]class DoesNotExist(Exception): """The requested resource does not exist."""
[docs]class CanonicalStore(ICanonicalStorage): """ Persists the canonical record in S3. The intended pattern for working with the canonical record is to use the :class:`.domain.CanonicalRecord` as the primary entrypoint for all operations. Consequently, this service offers only a single public instance method, :fund:`.load_record`. Persistence is achieved by attaching members to :class:`.domain.CanonicalRecord`, :class`.domain.Month`, and :class:`.domain.Listing` instances that implement reads/writes to S3. In this way, consumers of ``arxiv.canonical.domain`` can largely work directly with :class:`.domain.CanonicalRecord`, and persistence is handled transparently. """ def __init__(self, bucket: str, verify: bool = False, region_name: Optional[str] = None, endpoint_url: Optional[str] = None, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, read_only: bool = True) -> None: """Initialize with connection config parameters.""" self._bucket = bucket self._region_name = region_name self._endpoint_url = endpoint_url self._verify = verify self._aws_access_key_id = aws_access_key_id self._aws_secret_access_key = aws_secret_access_key self._read_only = read_only self.client = self._new_client() @property def read_only(self) -> bool: """ Determine whether or not this is a read-only session. This is read-only property to discourage users of this class to mess with it in runtime code. Should only be set via application configuration. """ return self._read_only
[docs] def can_resolve(self, uri: D.URI) -> bool: return isinstance(uri, D.Key) or uri.is_canonical
[docs] def inititalize(self) -> None: self.client.create_bucket(Bucket=self._bucket)
[docs] def is_available(self, retries: int = 0, read_timeout: int = 5, connect_timeout: int = 5) -> bool: """Determine whether or not we can read from/write to the store.""" raise NotImplementedError('Implement me!')
def _load_key(self, key: str) -> bytes: response = self.client.get_object(Bucket=self._bucket, Key=key) body: IO[bytes] = response['Body'] return body.read()
[docs] def load(self, key: D.URI) -> IO[bytes]: load: Callable[[], bytes] = partial(self._load_key, key) return BytesIOProxy(load)
[docs] def load_entry(self, key: D.URI) -> Tuple[R.RecordStream, Checksum]: assert isinstance(key, D.Key) logger.debug('Load entry at %s', key) response = self.client.get_object(Bucket=self._bucket, Key=key) stream = R.RecordStream( domain=D.CanonicalFile( modified=response['LastModified'], filename=key.filename, size_bytes=response['ContentLength'], content_type=D.ContentType.from_mimetype(response['ContentType']), ref=key # content=BytesIOProxy(response['Body'].read), ), content=BytesIOProxy(response['Body'].read), content_type=D.ContentType.from_mimetype(response['ContentType']), size_bytes=response['ContentLength'] ) return stream, _hex_to_b64(response['ETag'][1:-1])
[docs] def list_subkeys(self, key: str) -> List[str]: response = self.client.list_objects_v2(Bucket=self._bucket, Prefix=key) subs = [obj['Key'].split(key, 1)[1] for obj in response['Contents']] return [sub.split('/', 1)[0] if '/' in sub else sub for sub in subs]
[docs] def store_entry(self, ri: IStorableEntry) -> None: assert ri.record.stream.content is not None # Make sure to decompress the content if necessary. if ri.record.stream.domain.is_gzipped: body = gzip.GzipFile(fileobj=ri.record.stream.content).read() s3_checksum = _b64_to_hex(I.calculate_checksum(body)) else: body = ri.record.stream.content.read() s3_checksum = _b64_to_hex(ri.checksum) size_bytes = len(body) self.client.put_object(Bucket=self._bucket, Key=ri.record.key, Body=body, ContentLength=size_bytes, ContentMD5=s3_checksum, ContentType=ri.record.stream.content_type.mime_type) # Update the CanonicalFile if necessary. if ri.record.stream.domain.is_gzipped: ri.record.stream.domain.size_bytes = size_bytes ri.record.stream.domain.is_gzipped = False # Use an in-memory buffer for the checksum, to cut down on # unnecessary IO. ri.record.stream = \ ri.record.stream._replace(content=io.BytesIO(body)) ri.update_checksum() # Finally, replace the content IO with a deferred IO. ri.record.stream = ri.record.stream._replace( content=self.load(ri.record.key) )
[docs] def store_manifest(self, key: str, manifest: Manifest) -> None: body = dumps(manifest, cls=ManifestEncoder).encode('utf-8') self.client.put_object(Bucket=self._bucket, Key=key, Body=body, ContentLength=len(body), ContentMD5=I.checksum.checksum_raw(body), ContentType='application/json')
[docs] def load_manifest(self, key: str) -> Manifest: response = self.client.get_object(Bucket=self._bucket, Key=key) manifest: Manifest = load(response['Body'], cls=ManifestDecoder) return manifest
def _handle_client_error(self, exc: ClientError) -> None: if exc.response['Error']['Code'] == 'NoSuchBucket': raise DoesNotExist(f'{self._bucket} does not exist') from exc if exc.response['Error']['Code'] == "NoSuchKey": raise DoesNotExist(f'No such object in {self._bucket}') from exc raise RuntimeError('Unhandled ClientError') from exc def _new_client(self) -> boto3.client: # Only add credentials to the client if they are explicitly set. # If they are not set, boto3 falls back to environment variables and # credentials files. params: Dict[str, Any] = {'region_name': self._region_name} if self._aws_access_key_id and self._aws_secret_access_key: params['aws_access_key_id'] = self._aws_access_key_id params['aws_secret_access_key'] = self._aws_secret_access_key if self._endpoint_url: params['endpoint_url'] = self._endpoint_url params['verify'] = self._verify return boto3.client('s3', **params)
[docs]class InMemoryStorage(ICanonicalStorage): def __init__(self) -> None: self._streams: Dict[D.URI, Tuple[R.RecordStream, str]] = {} self._manifests: Dict[str, Manifest] = {}
[docs] def can_resolve(self, uri: D.URI) -> bool: return bool(uri in self._streams)
[docs] def load(self, key: D.URI) -> IO[bytes]: return self._streams[key][0].content
[docs] def load_entry(self, key: D.URI) -> Tuple[R.RecordStream, str]: assert isinstance(key, D.Key) return self._streams[key]
[docs] def list_subkeys(self, key: str) -> List[str]: return [k.split(key, 1)[1].split('/', 1)[0] for k in self._streams.keys() if k.startswith(key) and k != key]
[docs] def store_entry(self, ri: IStorableEntry) -> None: assert ri.record.stream.content is not None if ri.record.stream.domain.is_gzipped: content = gzip.GzipFile(fileobj=ri.record.stream.content).read() ri.record.stream.domain.size_bytes = len(content) ri.record.stream.domain.is_gzipped = False ri.record.stream = \ ri.record.stream._replace(content=io.BytesIO(content)) ri.update_checksum() self._streams[ri.record.key] = (ri.record.stream, ri.checksum) ri.record.stream.domain.ref = ri.record.key
[docs] def store_manifest(self, key: str, manifest: Manifest) -> None: self._manifests[key] = manifest
[docs] def load_manifest(self, key: str) -> Manifest: return self._manifests[key]
def _b64_to_hex(checksum: Checksum) -> str: return hexlify(urlsafe_b64decode(checksum.encode('utf-8'))).decode('utf-8') def _hex_to_b64(etag: str) -> Checksum: """Convert an hexdigest of an MD5 to a URL-safe base64-encoded digest.""" return urlsafe_b64encode(unhexlify(etag)).decode('utf-8')