Source code for arxiv.canonical.classic.abs

"""Parse fields from a single arXiv abstract (.abs) file."""

import os
import re
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, \
    NamedTuple
from functools import wraps
from dateutil import parser
from pytz import timezone
from datetime import datetime, date
from dateutil.tz import tzutc, gettz

from .. import domain as D

AnyIdentifier = Union[D.VersionedIdentifier, D.Identifier]

EASTERN = gettz('US/Eastern')

RE_ABS_COMPONENTS = re.compile(r'^\\\\\n', re.MULTILINE)
RE_FROM_FIELD = re.compile(
    r'(?P<from>From:\s*)(?P<name>[^<]+)?\s+(<(?P<email>.*)>)?')
RE_DATE_COMPONENTS = re.compile(
    r'^Date\s*(?::|\(revised\s*(?P<version>.*?)\):)\s*(?P<date>.*?)'
    r'(?:\s+\((?P<size_kilobytes>\d+)kb,?(?P<source_type>.*)\))?$')
RE_FIELD_COMPONENTS = re.compile(
    r'^(?P<field>[-a-z\)\(]+\s*):\s*(?P<value>.*)', re.IGNORECASE)
RE_ARXIV_ID_FROM_PREHISTORY = re.compile(
    r'(Paper:\s+|arXiv:)(?P<arxiv_id>\S+)')

NAMED_FIELDS = ['Title', 'Authors', 'Categories', 'Comments', 'Proxy',
                'Report-no', 'ACM-class', 'MSC-class', 'Journal-ref',
                'DOI', 'License']
"""
Fields that may be parsed from the key-value pairs in second
major component of .abs string. Field names are not normalized.
"""

REQUIRED_FIELDS = ['title', 'authors', 'abstract']
"""
Required parsed fields with normalized field names.

Note the absense of 'categories' as a required field. A subset of version-
affixed .abs files with the old identifiers predate the introduction of
categories and therefore do not have a "Categories:" line; only the (higher-
level) archive and group can be be inferred, and this must be done via the
identifier itself.

The latest versions of these papers should always have the "Categories:" line.
"""

# arXiv ID format used from 1991 to 2007-03
RE_ARXIV_OLD_ID = re.compile(
    r'^(?P<archive>[a-z]{1,}(\-[a-z]{2,})?)(\.([a-zA-Z\-]{2,}))?\/'
    r'(?P<yymm>(?P<yy>\d\d)(?P<mm>\d\d))(?P<num>\d\d\d)'
    r'(v(?P<version>[1-9]\d*))?([#\/].*)?$')

# arXiv ID format used from 2007-04 to present
RE_ARXIV_NEW_ID = re.compile(
    r'^(?P<yymm>(?P<yy>\d\d)(?P<mm>\d\d))\.(?P<num>\d{4,5})'
    r'(v(?P<version>[1-9]\d*))?([#\/].*)?$'
)

ASSUMED_LICENSE = D.License(
    href='http://arxiv.org/licenses/nonexclusive-distrib/1.0/'
)


[docs]class AbsRef(NamedTuple): identifier: D.VersionedIdentifier submitted_date: datetime announced_month: str source_type: D.SourceType size_kilobytes: int
[docs]class AbsData(NamedTuple): identifier: D.VersionedIdentifier submitter: Optional[D.Person] submitted_date: datetime announced_month: str updated_date: datetime license: D.License primary_classification: D.Category title: str abstract: str authors: str size_kilobytes: int submission_type: D.EventType secondary_classification: List[D.Category] source_type: Optional[D.SourceType] = None journal_ref: Optional[str] = None report_num: Optional[str] = None doi: Optional[str] = None msc_class: Optional[str] = None acm_class: Optional[str] = None proxy: Optional[str] = None comments: str = '' previous_versions: Optional[List[AbsRef]] = None
[docs]class NoSuchAbs(RuntimeError): pass
[docs]def original_base_path(data_path: str) -> str: return os.path.join(data_path, 'orig')
[docs]def latest_base_path(data_path: str) -> str: return os.path.join(data_path, 'ftp')
[docs]def latest_path_month(data_path: str, identifier: AnyIdentifier) -> str: """ Get the base path for the month block containing the "latest" e-prints. This is where the most recent version of each e-print always lives. """ return os.path.join( latest_base_path(data_path), identifier.category_part if identifier.is_old_style else 'arxiv', 'papers', identifier.yymm )
[docs]def original_path_month(data_path: str, identifier: AnyIdentifier) -> str: """ Get the main base path for an abs file. This is where all of the versions except for the most recent one live. """ return os.path.join( original_base_path(data_path), identifier.category_part if identifier.is_old_style else 'arxiv', 'papers', identifier.yymm )
[docs]def latest_path(data_path: str, identifier: AnyIdentifier) -> str: return os.path.join(latest_path_month(data_path, identifier), f'{identifier.numeric_part}.abs')
[docs]def original_path(data_path: str, identifier: D.VersionedIdentifier) -> str: return os.path.join(original_path_month(data_path, identifier), f'{identifier.numeric_part}v{identifier.version}.abs')
[docs]def get_path(data_path: str, identifier: D.VersionedIdentifier) -> str: # We look first for an "original" abs file that is explicitly identified # as the version we are looking for. path = original_path(data_path, identifier) if os.path.exists(path): return path # If we are asking for the first version and haven't found it already, the # only possibility is that there is one version and its abs file is located # in the "latest" section. if identifier.version == 1: path = latest_path(data_path, identifier) if not os.path.exists(path): raise NoSuchAbs(f'Cannot find abs record for {identifier}') return path # The only remaining possibility is that the version we are looking for # is indeed the "latest" version, in which case we must be able to find # an abs record for the previous version in the "original" section. previous = D.VersionedIdentifier.from_parts(identifier.arxiv_id, identifier.version - 1) if os.path.exists(original_path(data_path, previous)): return latest_path(data_path, identifier) # Voila! raise NoSuchAbs(f'Cannot find abs record for {identifier}')
[docs]def parse_versions(data_path: str, identifier: D.Identifier) \ -> Iterable[AbsData]: return [parse(data_path, v) for v in list_versions(data_path, identifier)]
[docs]def parse_latest(data_path: str, identifier: D.Identifier) -> AbsData: """Parse the abs for the latest version of an e-print.""" return _parse(latest_path(data_path, identifier))
[docs]def parse_first(data_path: str, identifier: D.Identifier) -> AbsData: """Parse the abs for the first version of an e-print.""" return _parse(get_path(data_path, D.VersionedIdentifier.from_parts(identifier, 1)))
[docs]def iter_all(data_path: str, from_id: Optional[D.Identifier] = None, to_id: Optional[D.Identifier] = None) -> Iterable[D.Identifier]: """ List all of the identifiers for which we have abs files. The "latest" section will have an abs file for every e-print, so that's the only place we need look. """ latest_root = latest_base_path(data_path) for dirpath, _, filenames in os.walk(latest_root): for filename in filenames: if filename.endswith('.abs'): prefix = dirpath.split(latest_root)[1].split('/')[1] numeric_part, _ = os.path.splitext(filename) if prefix == 'arxiv': identifier = D.Identifier(numeric_part) else: identifier = D.Identifier(f'{prefix}/{numeric_part}') if from_id and identifier < from_id: continue elif to_id and identifier >= to_id: continue yield identifier
[docs]def list_versions(data_path: str, identifier: D.Identifier) \ -> List[D.VersionedIdentifier]: """ List all of the versions for an identifier from abs files. This works by looking at the presence of abs files in both the "latest" and "original" locations. """ identifiers: List[D.VersionedIdentifier] = [] # We look first at "original" versions, as they will be explicitly named # with their numeric version affix. old_versions_exist = False orig_month_root = original_path_month(data_path, identifier) category = orig_month_root.split(data_path)[1].split('/')[2] for dpath, _, fnames in os.walk(orig_month_root): for filename in sorted(fnames): if filename.endswith('.abs') \ and filename.startswith(identifier.numeric_part): numeric_part_v, _ = os.path.splitext(filename) if identifier.is_old_style: vid = D.VersionedIdentifier(f'{category}/{numeric_part_v}') else: vid = D.VersionedIdentifier(numeric_part_v) old_versions_exist = True identifiers.append(vid) if old_versions_exist: # We are looking only at past versions above; the most recent version # lives somewhere else. We can infer its existence. _, v = numeric_part_v.split('v') identifiers.append( D.VersionedIdentifier.from_parts(identifier, int(v) + 1) ) elif os.path.exists(latest_path(data_path, identifier)): # There is only one version, the first version, and it is the # latest version. identifiers.append(D.VersionedIdentifier.from_parts(identifier, 1)) return identifiers
[docs]def parse(data_path: str, identifier: D.VersionedIdentifier) -> AbsData: return _parse(get_path(data_path, identifier))
def _parse(path: str) -> AbsData: with open(path, mode='r', encoding='latin-1') as f: raw = f.read() # The best we can do to infer when the last update was made was to examine # the modification time of the abs file itself. mtime = os.path.getmtime(path) modified = datetime.fromtimestamp(mtime, tz=EASTERN).astimezone(tz=tzutc()) # There are two main components to an .abs file that contain data, # but the split must always return four components. components = RE_ABS_COMPONENTS.split(raw) if not len(components) == 4: raise IOError(f'Unexpected number of components parsed from {path}') # Everything else is in the second main component. prehistory, misc_fields = re.split(r'\n\n', components[1]) fields: Dict[str, Any] = _parse_metadata(key_value_block=misc_fields) # Abstract is the first main component. fields['abstract'] = components[2] id_match = RE_ARXIV_ID_FROM_PREHISTORY.match(prehistory) if not id_match: raise IOError('Could not extract arXiv ID from prehistory component.') arxiv_id = id_match.group('arxiv_id') prehistory = re.sub(r'^.*\n', '', prehistory) parsed_version_entries = re.split(r'\n', prehistory) # Submitter data. from_match = RE_FROM_FIELD.match(parsed_version_entries.pop(0)) if not from_match: raise IOError('Could not extract submitter data.') name = from_match.group('name') if name is not None: name = name.rstrip() # Get the version history for this particular version of the document. if not len(parsed_version_entries) >= 1: raise IOError('At least one version entry expected.') versions = _parse_versions(arxiv_id=arxiv_id, version_entry_list=parsed_version_entries) secondary_classification: List[str] = [] if 'categories' in fields and fields['categories']: classifications = fields['categories'].split() primary_classification = classifications[0] secondary_classification = classifications[1:] else: match = RE_ARXIV_OLD_ID.match(arxiv_id) if not match: raise IOError('Could not determine primary classification') primary_classification = match.group('archive') if 'license' in fields: license = D.License(fields['license']) else: license = ASSUMED_LICENSE if versions[-1].identifier.version == 1: submission_type = D.EventType.NEW elif versions[-1].size_kilobytes == 0: submission_type = D.EventType.WITHDRAWN else: submission_type = D.EventType.REPLACED return AbsData( identifier=versions[-1].identifier, submitter=D.Person(full_name=name) if name else None, submitted_date=versions[-1].submitted_date, announced_month=versions[-1].announced_month, updated_date=modified, license=license, primary_classification=primary_classification, title=fields['title'], abstract=fields['abstract'], authors=fields['authors'], source_type=versions[-1].source_type, size_kilobytes=versions[-1].size_kilobytes, submission_type=submission_type, secondary_classification=secondary_classification, journal_ref=fields.get('journal_ref'), report_num=fields.get('report_num'), doi=fields.get('doi'), msc_class=fields.get('msc_class'), acm_class=fields.get('acm_class'), proxy=fields.get('proxy'), comments=fields.get('comments', ''), previous_versions=versions[:-1], ) def _parse_metadata(key_value_block: str) -> Dict[str, str]: """Parse the key-value block from the arXiv .abs string.""" key_value_block = key_value_block.lstrip() field_lines = re.split(r'\n', key_value_block) field_name = 'unknown' fields_builder: Dict[str, str] = {} for field_line in field_lines: field_match = RE_FIELD_COMPONENTS.match(field_line) if field_match and field_match.group('field') in NAMED_FIELDS: field_name = field_match.group('field').lower().replace('-', '_') field_name = re.sub(r'_no$', '_num', field_name) fields_builder[field_name] = field_match.group('value').rstrip() elif field_name != 'unknown': # we have a line with leading spaces fields_builder[field_name] += re.sub(r'^\s+', ' ', field_line) return fields_builder def _parse_announced(arxiv_id: str) -> str: match = RE_ARXIV_OLD_ID.match(arxiv_id) if not match: match = RE_ARXIV_NEW_ID.match(arxiv_id) if not match: raise ValueError('Not a valid arXiv ID') yy = int(match.group('yy')) mm = int(match.group('mm')) year = f'19{yy}' if yy > 90 else f'20{yy}' return f'{year}-{mm}' def _parse_versions(arxiv_id: str, version_entry_list: List) -> List[AbsRef]: """Parse the version entries from the arXiv .abs file.""" version_entries: List[AbsRef] = list() for parsed_version_entry in version_entry_list: date_match = RE_DATE_COMPONENTS.match(parsed_version_entry) if not date_match: raise IOError('Could not extract date components from date line.') try: sd = date_match.group('date') submitted_date = parser.parse(date_match.group('date')) except (ValueError, TypeError): raise IOError(f'Could not parse submitted date {sd} as datetime') source_type = D.SourceType(date_match.group('source_type')) size_kilobytes = int(date_match.group('size_kilobytes')) V = len(version_entries) + 1 identifier = \ D.VersionedIdentifier(f'{D.Identifier(arxiv_id)}v{V}') version_entries.append( AbsRef( identifier=identifier, submitted_date=submitted_date, announced_month=_parse_announced(arxiv_id), source_type=source_type, size_kilobytes=size_kilobytes ) ) return version_entries