Source code for arxiv.canonical.integrity.checksum
import io
from base64 import urlsafe_b64encode
from hashlib import md5
from operator import itemgetter
from typing import List, IO, Union, cast
from ..record import RecordStream
from ..manifest import Manifest
from .exceptions import ChecksumError
[docs]def calculate_checksum(obj: Union[bytes, IO[bytes], Manifest, RecordStream]) \
-> str:
if isinstance(obj, bytes):
return checksum_raw(obj)
if isinstance(obj, dict):
return checksum_manifest(obj)
if isinstance(obj, io.IOBase):
return checksum_io(obj)
if isinstance(obj, RecordStream):
assert obj.content is not None
return checksum_io(obj.content)
raise TypeError(f'Cannot generate a checksum from a {type(obj)}')
[docs]def checksum_raw(raw: bytes) -> str:
hash_md5 = md5()
hash_md5.update(raw)
return urlsafe_b64encode(hash_md5.digest()).decode('utf-8')
[docs]def checksum_io(content: IO[bytes]) -> str:
"""Generate an URL-safe base64-encoded md5 hash of an IO."""
if content.seekable:
content.seek(0) # Make sure that we are at the start of the stream.
hash_md5 = md5()
for chunk in iter(lambda: content.read(4096), b""):
hash_md5.update(chunk)
if content.seekable:
content.seek(0) # Be a good neighbor for subsequent users.
return urlsafe_b64encode(hash_md5.digest()).decode('utf-8')
[docs]def checksum_manifest(manifest: Manifest) -> str:
components: List[str] = []
for entry in sorted(manifest['entries'], key=itemgetter('key')):
if 'checksum' not in entry or entry['checksum'] is None:
raise ChecksumError(f'Missing checksum: {entry}')
components.append(entry['checksum'])
return checksum_raw(''.join(components).encode('utf-8'))