"""
Functions for resolving classic content.
TODO: really need to cache stuff here.
"""
import logging
import os
import time
from datetime import datetime
from functools import partial
from typing import Callable, Iterable, List, MutableMapping, Optional, Tuple
from pytz import timezone
from .. import domain as D
from ..services import RemoteSource
logger = logging.getLogger(__name__)
logger.setLevel(int(os.environ.get('LOGLEVEL', '40')))
ET = timezone('US/Eastern')
REMOTE: Optional['RemoteSourceWithHead'] = None
_CF_Cache = MutableMapping[Tuple[D.VersionedIdentifier, D.ContentType],
Optional[D.CanonicalFile]]
[docs]def get_source_path(dpath: str, ident: D.VersionedIdentifier) -> str:
for ext in D.list_source_extensions():
path = _get_path(_orig_source, _latest_source, dpath, ident, ext)
if path is not None:
return path
raise IOError(f'No source path found for {ident}')
[docs]def get_source(data: str, ident: D.VersionedIdentifier) -> D.CanonicalFile:
"""Get the source file for a version from classic."""
logger.debug(f'Getting source for {ident}')
path = get_source_path(data, ident)
mtime = datetime.utcfromtimestamp(os.path.getmtime(path)).astimezone(ET)
try:
content_type = D.ContentType.from_filename(path)
except ValueError:
# In classic, stand-alone tex files were not given extensions.
content_type = D.ContentType.tex
is_gzipped = bool(path.endswith('.gz'))
cf = D.CanonicalFile(
modified=mtime,
size_bytes=os.path.getsize(path),
content_type=content_type,
ref=D.URI(path),
filename=content_type.make_filename(ident),
is_gzipped=is_gzipped
)
logger.debug('Got source file for %s: %s', ident, cf.ref)
return cf
def _get_via_http(ident: D.VersionedIdentifier,
content_type: D.ContentType,
remote: str = 'arxiv.org') -> Optional[D.CanonicalFile]:
"""Retrieve the"""
logger.debug('Getting metadata for %s for %s via http',
content_type.value, ident)
global REMOTE # This is fine for now since this is single-threaded.
if REMOTE is None:
REMOTE = RemoteSourceWithHead(remote)
# The .dvi extension is not supported in the classic /dvi route.
if content_type == D.ContentType.dvi:
path = f'{content_type.value}/{ident}'
else:
path = f'{content_type.value}/{ident}.{content_type.ext}'
cf = REMOTE.head(D.URI(f'https://arxiv.org/{path}'), content_type)
if cf is not None:
cf.filename = content_type.make_filename(ident)
return cf
[docs]class RemoteSourceWithHead(RemoteSource):
[docs] def head(self, key: D.URI, content_type: D.ContentType) \
-> Optional[D.CanonicalFile]:
response = self._session.head(key, allow_redirects=True)
# arXiv may need to rebuild the product.
while response.status_code == 200 and 'Refresh' in response.headers:
time.sleep(int(response.headers['Refresh']))
response = self._session.head(key, allow_redirects=True)
if response.status_code != 200:
logger.error('%i: %s', response.status_code, response.headers)
raise IOError(f'Could not retrieve {key}: {response.status_code}')
# At this point, we are most likely encountering the "unavailable"
# page, which (intriguingly) returns 200 instead of 404.
if 'Last-Modified' not in response.headers:
return None
mtime = datetime.strptime(response.headers['Last-Modified'],
'%a, %d %b %Y %H:%M:%S %Z').astimezone(ET)
# Oddly, arxiv.org may return compressed content (i.e. not just for
# transport). We've been around for a while!
is_gzipped = bool(response.headers.get('Content-Encoding') == 'x-gzip')
return D.CanonicalFile(
modified=mtime,
size_bytes=int(response.headers['Content-Length']),
content_type=content_type,
ref=D.URI(response.url), # There may have been redirects.
is_gzipped=is_gzipped
)
def _latest(data: str, ident: D.Identifier, filename: str) -> str:
cat = ident.category_part if ident.is_old_style else 'arxiv'
return os.path.join(data, 'ftp', cat, 'papers', ident.yymm, filename)
def _orig(data: str, ident: D.VersionedIdentifier, filename: str) -> str:
cat = ident.category_part if ident.is_old_style else 'arxiv'
return os.path.join(data, 'orig', cat, 'papers', ident.yymm, filename)
def _cache(content_type: D.ContentType, ps_cache_path: str,
ident: D.VersionedIdentifier, ext: str) -> str:
if ident.is_old_style:
filename = f'{ident.numeric_part}v{ident.version}.{ext}'
else:
filename = f'{ident}.{ext}'
cat = ident.category_part if ident.is_old_style else 'arxiv'
return os.path.join(ps_cache_path, 'ps_cache', cat, content_type.value,
ident.yymm, filename)
def _latest_source(path: str, ident: D.Identifier, ext: str) -> str:
if ident.is_old_style:
fname = f'{ident.numeric_part}.{ext.lstrip(".")}'
else:
fname = f'{ident}.{ext.lstrip(".")}'
return _latest(path, ident, fname)
def _orig_source(path: str, ident: D.VersionedIdentifier, ext: str) -> str:
if ident.is_old_style:
fname = f'{ident.numeric_part}v{ident.version}.{ext.lstrip(".")}'
else:
fname = f'{ident}.{ext.lstrip(".")}'
return _orig(path, ident, fname)
def _get_path(get_orig: Callable[[str, D.VersionedIdentifier, str], str],
get_latest: Callable[[str, D.Identifier, str], str],
dpath: str,
ident: D.VersionedIdentifier,
ext: str) -> Optional[str]:
"""
Generic logic for finding the path to a resource.
Resources for the latest version are stored separately from resources
for prior versions. But resources for the latest version are not named
with their version number affix.
A second challenge is that in some cases we do not know ahead of time what
file format (and hence filename) we are looking for.
So it takes a bit of a dance to figure out whether a respondant resource
exists, and where it is located.
"""
# For versions prior to the latest, resources are named with their
# version affix.
orig = get_orig(dpath, ident, ext)
logger.debug(orig)
if os.path.exists(orig):
logger.debug(f'found orig path: {orig}')
return orig
# If this is the first version, the only other place it could be is
# in the "latest" section.
latest = get_latest(dpath, ident.arxiv_id, ext)
if ident.version == 1 and os.path.exists(latest):
logger.debug(f'can only be in latest: {latest}')
return latest
# If the prior version exists in the "original" section, then the latest
# version must be the one that we are working with.
prior = D.VersionedIdentifier.from_parts(ident.arxiv_id, ident.version - 1)
# Have to check for the abs file, since we don't know what format the
# previous version was in.
if os.path.exists(get_orig(dpath, prior, 'abs')):
if os.path.exists(latest):
logger.debug(f'prior version in orig; must be latest: {latest}')
return latest
return None