"""
Integration with the classic database to persist events and submission state.
As part of the classic renewal strategy, development of new submission
interfaces must maintain data interoperability with classic components. This
service module must therefore do three main things:
1. Store and provide access to event data generated during the submission
process,
2. Keep the classic database tables up to date so that "downstream" components
can continue to operate.
3. Patch NG submission data with state changes that occur in the classic
system. Those changes will be made directly to submission tables and not
involve event-generation. See :func:`get_submission` for details.
Since classic components work directly on submission tables, persisting events
and resulting submission state must occur in the same transaction. We must also
verify that we are not storing events that are stale with respect to the
current state of the submission. To achieve this, the caller should use the
:func:`.util.transaction` context manager, and (when committing new events)
call :func:`.get_submission` with ``for_update=True``. This will trigger a
shared lock on the submission row(s) involved until the transaction is
committed or rolled back.
ORM representations of the classic database tables involved in submission
are located in :mod:`.classic.models`. An additional model, :class:`.DBEvent`,
is defined in :mod:`.classic.event`.
See also :ref:`legacy-integration`.
"""
from typing import List, Optional, Tuple, Set, Callable, Any
from retry import retry
from datetime import datetime
from pytz import UTC
from itertools import groupby
import copy
from functools import reduce, wraps
from operator import ior
from dataclasses import asdict
from flask import Flask
from sqlalchemy import or_, text
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.exc import DBAPIError, OperationalError
from arxiv.base import logging
from arxiv.base.globals import get_application_config, get_application_global
from ...domain.event import Event, Announce, RequestWithdrawal, SetDOI, \
SetJournalReference, SetReportNumber, Rollback, RequestCrossList, \
ApplyRequest, RejectRequest, ApproveRequest, AddProposal, CancelRequest, \
CreateSubmission
from ...domain.submission import License, Submission, WithdrawalRequest, \
CrossListClassificationRequest
from ...domain.agent import Agent, User
from .models import Base
from .exceptions import ClassicBaseException, NoSuchSubmission, \
TransactionFailed, Unavailable, ConsistencyError
from .util import transaction, current_session, db
from .event import DBEvent
from . import models, util, interpolate, log, proposal, load
logger = logging.getLogger(__name__)
logger.propagate = False
[docs]def handle_operational_errors(func):
"""Catch SQLAlchemy OperationalErrors and raise :class:`.Unavailable`."""
@wraps(func)
def inner(*args, **kwargs):
try:
return func(*args, **kwargs)
except OperationalError as e:
logger.error('Encountered an error talking to database: %s', e)
raise Unavailable('Classic database unavailable') from e
return inner
[docs]def is_available(**kwargs: Any) -> bool:
"""Check our connection to the database."""
try:
current_session().query("1").from_statement(text("SELECT 1")).all()
except Exception as e:
logger.error('Encountered an error talking to database: %s', e)
return False
return True
[docs]@retry(ClassicBaseException, tries=3, delay=1)
@handle_operational_errors
def get_licenses() -> List[License]:
"""Get a list of :class:`.domain.License` instances available."""
license_data = current_session().query(models.License) \
.filter(models.License.active == '1')
return [License(uri=row.name, name=row.label) for row in license_data]
[docs]@retry(ClassicBaseException, tries=3, delay=1)
@handle_operational_errors
def get_events(submission_id: int) -> List[Event]:
"""
Load events from the classic database.
Parameters
----------
submission_id : int
Returns
-------
list
Items are :class:`.Event` instances loaded from the class DB.
Raises
------
:class:`.classic.exceptions.NoSuchSubmission`
Raised when there are no events for the provided submission ID.
"""
session = current_session()
event_data = session.query(DBEvent) \
.filter(DBEvent.submission_id == submission_id) \
.order_by(DBEvent.created)
events = [datum.to_event() for datum in event_data]
if not events: # No events, no dice.
logger.error('No events for submission %s', submission_id)
raise NoSuchSubmission(f'Submission {submission_id} not found')
return events
[docs]@retry(ClassicBaseException, tries=3, delay=1)
@handle_operational_errors
def get_user_submissions_fast(user_id: int) -> List[Submission]:
"""
Get active NG submissions for a user.
This should not return submissions for which there are no events.
Uses the same approach as :func:`get_submission_fast`.
Parameters
----------
submission_id : int
Returns
-------
list
Items are the user's :class:`.domain.submission.Submission` instances.
"""
session = current_session()
db_submissions = list(
session.query(models.Submission)
.filter(models.Submission.submitter_id == user_id)
.join(DBEvent) # Only get submissions that are also in the event table
.order_by(models.Submission.doc_paper_id.desc())
)
grouped = groupby(db_submissions, key=lambda dbs: dbs.doc_paper_id)
submissions: List[Submission] = []
for arxiv_id, dbss in grouped:
logger.debug('Handle group for arXiv ID %s: %s', arxiv_id, dbss)
if arxiv_id is None: # This is an unannounced submission.
for dbs in dbss: # Each row represents a separate e-print.
submissions.append(load.to_submission(dbs))
else:
dbss = sorted(dbss, key=lambda dbs: dbs.submission_id)
submissions.append(load.load(dbss))
return [subm for subm in submissions if subm and not subm.is_deleted]
[docs]@retry(ClassicBaseException, tries=3, delay=1)
@handle_operational_errors
def get_submission_fast(submission_id: int) -> List[Submission]:
"""
Get the projection of the submission directly.
Instead of playing events forward, we grab the most recent snapshot of the
submission in the database. Since classic represents the submission using
several rows, we have to grab all of them and transform/patch as
appropriate.
Parameters
----------
submission_id : int
Returns
-------
:class:`.domain.submission.Submission`
Raises
------
:class:`.classic.exceptions.NoSuchSubmission`
Raised when there are is no submission for the provided submission ID.
"""
return load.load(_get_db_submission_rows(submission_id))
# @retry(ClassicBaseException, tries=3, delay=1)
[docs]@handle_operational_errors
def get_submission(submission_id: int, for_update: bool = False) \
-> Tuple[Submission, List[Event]]:
"""
Get the current state of a submission from the database.
In the medium term, services that use this package will need to
play well with legacy services that integrate with the classic
database. For example, the moderation system does not use the event
model implemented here, and will therefore cause direct changes to the
submission tables that must be reflected in our representation of the
submission.
Until those legacy components are replaced, this function loads both the
event stack and the current DB state of the submission, and uses the DB
state to patch fields that may have changed outside the purview of the
event model.
Parameters
----------
submission_id : int
Returns
-------
:class:`.domain.submission.Submission`
list
Items are :class:`Event` instances.
"""
# Let the caller determine the transaction scope.
session = current_session()
original_row = session.query(models.Submission) \
.filter(models.Submission.submission_id == submission_id)
if for_update:
# Gives us SELECT ... FOR READ. In other words, lock this row for
# writing, but allow other clients to read from it in the meantime.
original_row = original_row.with_for_update(read=True)
try:
original_row = original_row.one()
logger.debug('Got row %s', original_row)
except NoResultFound as exc:
logger.debug('Got NoResultFound exception %s', exc)
raise NoSuchSubmission(f'Submission {submission_id} not found')
# May also raise MultipleResultsFound; if so, we want to fail loudly.
# Load any subsequent submission rows (e.g. v=2, jref, withdrawal).
# These do not have the same legacy submission ID as the original
# submission.
subsequent_rows: List[models.Submission] = []
arxiv_id = original_row.get_arxiv_id()
if arxiv_id is not None:
subsequent_rows = session.query(models.Submission) \
.filter(models.Submission.doc_paper_id == arxiv_id) \
.filter(models.Submission.submission_id != submission_id) \
.order_by(models.Submission.submission_id.asc())
if for_update: # Lock these rows as well.
subsequent_rows = subsequent_rows.with_for_update(read=True)
subsequent_rows = list(subsequent_rows) # Execute query.
logger.debug('Got subsequent_rows: %s', subsequent_rows)
try:
_events = get_events(submission_id)
except NoSuchSubmission:
_events = []
# If this submission originated in the classic system, we will have usable
# rows from the submission table, and either no events or events that do
# not start with a CreateSubmission event. In that case, fall back to
# ``load.load()``, which relies only on classic rows.
if not _events or not isinstance(_events[0], CreateSubmission):
logger.info('Loading a classic submission: %s', submission_id)
submission = load.load([original_row] + subsequent_rows)
if submission is None:
raise NoSuchSubmission('No such submission')
return submission, []
# We have an NG-native submission.
interpolator = interpolate.ClassicEventInterpolator(
original_row,
subsequent_rows,
_events
)
return interpolator.get_submission_state()
# @retry(ClassicBaseException, tries=3, delay=1)
[docs]@handle_operational_errors
def store_event(event: Event, before: Optional[Submission],
after: Optional[Submission],
*call: List[Callable]) -> Tuple[Event, Submission]:
"""
Store an event, and update submission state.
This is where we map the NG event domain onto the classic database. The
main differences are that:
- In the event domain, a submission is a single stream of events, but
in the classic system we create new rows in the submission database
for things like replacements, adding DOIs, and withdrawing papers.
- In the event domain, the only concept of the announced paper is the
paper ID. In the classic submission database, we also have to worry about
the row in the Document database.
We assume that the submission states passed to this function have the
correct paper ID and version number, if announced. The submission ID on
the event and the before/after states refer to the original classic
submission only.
Parameters
----------
event : :class:`Event`
before : :class:`Submission`
The state of the submission before the event occurred.
after : :class:`Submission`
The state of the submission after the event occurred.
call : list
Items are callables that accept args ``Event, Submission, Submission``.
These are called within the transaction context; if an exception is
raised, the transaction is rolled back.
"""
# Let the caller determine the transaction scope.
session = current_session()
if event.committed:
raise TransactionFailed('%s already committed', event.event_id)
logger.debug('store event %s', event.event_type)
doc_id: Optional[int] = None
# This is the case that we have a new submission.
if before is None and isinstance(after, Submission):
dbs = models.Submission(type=models.Submission.NEW_SUBMISSION)
dbs.update_from_submission(after)
this_is_a_new_submission = True
else: # Otherwise we're making an update for an existing submission.
this_is_a_new_submission = False
# After the original submission is announced, a new Document row is
# created. This Document is shared by all subsequent Submission rows.
if before.is_announced:
doc_id = _load_document_id(before.arxiv_id, before.version)
JREFEvents = [SetDOI, SetJournalReference, SetReportNumber]
# From the perspective of the database, a replacement is mainly an
# incremented version number. This requires a new row in the
# database.
if after.version > before.version:
dbs = _create_replacement(doc_id, before.arxiv_id,
after.version, after, event.created)
elif isinstance(event, Rollback) and before.version > 1:
dbs = _delete_replacement(doc_id, before.arxiv_id,
before.version)
# Withdrawals also require a new row, and they use the most recent
# version number.
elif isinstance(event, RequestWithdrawal):
dbs = _create_withdrawal(doc_id, event.reason,
before.arxiv_id, after.version, after,
event.created)
elif isinstance(event, RequestCrossList):
dbs = _create_crosslist(doc_id, event.categories,
before.arxiv_id, after.version, after,
event.created)
# Adding DOIs and citation information (so-called "journal reference")
# also requires a new row. The version number is not incremented.
elif before.is_announced and type(event) in JREFEvents:
dbs = _create_jref(doc_id, before.arxiv_id, after.version, after,
event.created)
elif isinstance(event, CancelRequest):
dbs = _cancel_request(event, before, after)
# The submission has been announced.
elif isinstance(before, Submission) and before.arxiv_id is not None:
dbs = _load(paper_id=before.arxiv_id, version=before.version)
_preserve_sticky_hold(dbs, before, after, event)
dbs.update_from_submission(after)
# The submission has not yet been announced; we're working with a
# single row.
elif isinstance(before, Submission) and before.submission_id:
dbs = _load(before.submission_id)
_preserve_sticky_hold(dbs, before, after, event)
dbs.update_from_submission(after)
else:
raise TransactionFailed("Something is fishy")
db_event = _new_dbevent(event)
session.add(dbs)
session.add(db_event)
# Make sure that we get a submission ID; note that this # does not commit
# the transaction, just pushes the # SQL that we have generated so far to
# the database # server.
session.flush()
log.handle(event, before, after) # Create admin log entry.
for func in call:
logger.debug('call %s with event %s', func, event.event_id)
func(event, before, after)
if isinstance(event, AddProposal):
proposal.add(event, before, after)
# Attach the database object for the event to the row for the
# submission.
if this_is_a_new_submission: # Update in transaction.
db_event.submission = dbs
else: # Just set the ID directly.
db_event.submission_id = before.submission_id
event.committed = True
# Update the domain event and submission states with the submission ID.
# This should carry forward the original submission ID, even if the
# classic database has several rows for the submission (with different
# IDs).
if this_is_a_new_submission:
event.submission_id = dbs.submission_id
after.submission_id = dbs.submission_id
else:
event.submission_id = before.submission_id
after.submission_id = before.submission_id
return event, after
[docs]@retry(ClassicBaseException, tries=3, delay=1)
@handle_operational_errors
def get_titles(since: datetime) -> List[Tuple[int, str, Agent]]:
"""Get titles from submissions created on or after a particular date."""
# TODO: consider making this a param, if we need this function for anything
# else.
STATUSES_TO_CHECK = [
models.Submission.SUBMITTED,
models.Submission.ON_HOLD,
models.Submission.NEXT_PUBLISH_DAY,
models.Submission.REMOVED,
models.Submission.USER_DELETED,
models.Submission.DELETED_ON_HOLD,
models.Submission.DELETED_PROCESSING,
models.Submission.DELETED_REMOVED,
models.Submission.DELETED_USER_EXPIRED
]
session = current_session()
q = session.query(
models.Submission.submission_id,
models.Submission.title,
models.Submission.submitter_id,
models.Submission.submitter_email
)
q = q.filter(models.Submission.status.in_(STATUSES_TO_CHECK))
q = q.filter(models.Submission.created >= since)
return [
(submission_id, title, User(native_id=user_id, email=user_email))
for submission_id, title, user_id, user_email in q.all()
]
# Private functions down here.
def _load(submission_id: Optional[int] = None, paper_id: Optional[str] = None,
version: Optional[int] = 1, row_type: Optional[str] = None) \
-> models.Submission:
if row_type is not None:
limit_to = [row_type]
else:
limit_to = [models.Submission.NEW_SUBMISSION,
models.Submission.REPLACEMENT]
session = current_session()
if submission_id is not None:
submission = session.query(models.Submission) \
.filter(models.Submission.submission_id == submission_id) \
.filter(models.Submission.type.in_(limit_to)) \
.one()
elif submission_id is None and paper_id is not None:
submission = session.query(models.Submission) \
.filter(models.Submission.doc_paper_id == paper_id) \
.filter(models.Submission.version == version) \
.filter(models.Submission.type.in_(limit_to)) \
.order_by(models.Submission.submission_id.desc()) \
.first()
else:
submission = None
if submission is None:
raise NoSuchSubmission("No submission row matches those parameters")
return submission
def _cancel_request(event, before, after):
request = before.user_requests[event.request_id]
if isinstance(request, WithdrawalRequest):
row_type = models.Submission.WITHDRAWAL
elif isinstance(request, CrossListClassificationRequest):
row_type = models.Submission.CROSS_LIST
dbs = _load(paper_id=before.arxiv_id, version=before.version,
row_type=row_type)
dbs.status = models.Submission.USER_DELETED
return dbs
def _load_document_id(paper_id: str, version: int) -> int:
logger.debug('get document ID with %s and %s', paper_id, version)
session = current_session()
document_id = session.query(models.Submission.document_id) \
.filter(models.Submission.doc_paper_id == paper_id) \
.filter(models.Submission.version == version) \
.first()
if document_id is None:
raise NoSuchSubmission("No submission row matches those parameters")
return document_id[0]
def _create_replacement(document_id: int, paper_id: str, version: int,
submission: Submission, created: datetime) \
-> models.Submission:
"""
Create a new replacement submission.
From the perspective of the database, a replacement is mainly an
incremented version number. This requires a new row in the database.
"""
dbs = models.Submission(type=models.Submission.REPLACEMENT,
document_id=document_id, version=version)
dbs.update_from_submission(submission)
dbs.created = created
dbs.updated = created
dbs.doc_paper_id = paper_id
dbs.status = models.Submission.NOT_SUBMITTED
return dbs
def _delete_replacement(document_id: int, paper_id: str, version: int) \
-> models.Submission:
session = current_session()
dbs = session.query(models.Submission) \
.filter(models.Submission.doc_paper_id == paper_id) \
.filter(models.Submission.version == version) \
.filter(models.Submission.type == models.Submission.REPLACEMENT) \
.order_by(models.Submission.submission_id.desc()) \
.first()
dbs.status = models.Submission.USER_DELETED
return dbs
def _create_withdrawal(document_id: int, reason: str, paper_id: str,
version: int, submission: Submission,
created: datetime) -> models.Submission:
"""
Create a new withdrawal request.
Withdrawals also require a new row, and they use the most recent version
number.
"""
dbs = models.Submission(type=models.Submission.WITHDRAWAL,
document_id=document_id,
version=version)
dbs.update_withdrawal(submission, reason, paper_id, version, created)
return dbs
def _create_crosslist(document_id: int, categories: List[str], paper_id: str,
version: int, submission: Submission,
created: datetime) -> models.Submission:
"""
Create a new crosslist request.
Cross list requests also require a new row, and they use the most recent
version number.
"""
dbs = models.Submission(type=models.Submission.CROSS_LIST,
document_id=document_id,
version=version)
dbs.update_cross(submission, categories, paper_id, version, created)
return dbs
def _create_jref(document_id: int, paper_id: str, version: int,
submission: Submission,
created: datetime) -> models.Submission:
"""
Create a JREF submission.
Adding DOIs and citation information (so-called "journal reference") also
requires a new row. The version number is not incremented.
"""
# Try to piggy-back on an existing JREF row. In the classic system, all
# three fields can get updated on the same row.
try:
most_recent_sb = _load(paper_id=paper_id, version=version,
row_type=models.Submission.JOURNAL_REFERENCE)
if most_recent_sb and not most_recent_sb.is_announced():
most_recent_sb.update_from_submission(submission)
return most_recent_sb
except NoSuchSubmission:
pass
# Otherwise, create a new JREF row.
dbs = models.Submission(type=models.Submission.JOURNAL_REFERENCE,
document_id=document_id, version=version)
dbs.update_from_submission(submission)
dbs.created = created
dbs.updated = created
dbs.doc_paper_id = paper_id
dbs.status = models.Submission.PROCESSING_SUBMISSION
return dbs
def _new_dbevent(event: Event) -> DBEvent:
"""Create an event entry in the database."""
return DBEvent(event_type=event.event_type,
event_id=event.event_id,
event_version=_get_app_version(),
data=asdict(event),
created=event.created,
creator=asdict(event.creator),
proxy=asdict(event.proxy) if event.proxy else None)
def _preserve_sticky_hold(dbs: models.Submission, before: Submission,
after: Submission, event: Event) -> None:
if dbs.status != models.Submission.ON_HOLD:
return
if dbs.is_on_hold() and after.status == Submission.WORKING:
dbs.sticky_status = models.Submission.ON_HOLD
def _get_db_submission_rows(submission_id: int) -> List[models.Submission]:
session = current_session()
head = session.query(models.Submission.submission_id,
models.Submission.doc_paper_id) \
.filter_by(submission_id=submission_id) \
.subquery()
dbss = list(
session.query(models.Submission)
.filter(or_(models.Submission.submission_id == submission_id,
models.Submission.doc_paper_id == head.c.doc_paper_id))
.order_by(models.Submission.submission_id.desc())
)
if not dbss:
raise NoSuchSubmission('No submission found')
return dbss
def _get_app_version() -> str:
return get_application_config().get('CORE_VERSION', '0.0.0')
[docs]def init_app(app: Flask) -> None:
"""Register the SQLAlchemy extension to an application."""
db.init_app(app)
@app.teardown_request
def teardown_request(exception) -> None:
if exception:
db.session.rollback()
db.session.remove()
@app.teardown_appcontext
def teardown_appcontext(*args, **kwargs) -> None:
db.session.rollback()
db.session.remove()
[docs]def create_all() -> None:
"""Create all tables in the database."""
Base.metadata.create_all(db.engine)
[docs]def drop_all() -> None:
"""Drop all tables in the database."""
Base.metadata.drop_all(db.engine)
def _get_db_submission_rows(submission_id: int) -> List[models.Submission]:
session = current_session()
head = session.query(models.Submission.submission_id,
models.Submission.doc_paper_id) \
.filter_by(submission_id=submission_id) \
.subquery()
dbss = list(
session.query(models.Submission)
.filter(or_(models.Submission.submission_id == submission_id,
models.Submission.doc_paper_id == head.c.doc_paper_id))
.order_by(models.Submission.submission_id.desc())
)
if not dbss:
raise NoSuchSubmission('No submission found')
return dbss