Source code for arxiv.submission.domain.event.process

"""Events related to external or long-running processes."""

from typing import Optional

from dataclasses import field

from ...exceptions import InvalidEvent
from ..submission import Submission
from ..process import ProcessStatus
from .base import Event
from .util import dataclass


[docs]@dataclass() class AddProcessStatus(Event): """Add the status of an external/long-running process to a submission.""" NAME = "add status of a process" NAMED = "added status of a process" Status = ProcessStatus.Status process_id: Optional[str] = field(default=None) process: Optional[str] = field(default=None) step: Optional[str] = field(default=None) status: Status = field(default=Status.PENDING) reason: Optional[str] = field(default=None) def __post_init__(self) -> None: """Make sure our enums are in order.""" super(AddProcessStatus, self).__post_init__() self.status = self.Status(self.status)
[docs] def validate(self, submission: Submission) -> None: """Verify that we have a :class:`.ProcessStatus`.""" if self.process is None: raise InvalidEvent(self, "Must include process")
[docs] def project(self, submission: Submission) -> Submission: """Add the process status to the submission.""" submission.processes.append(ProcessStatus( creator=self.creator, created=self.created, process=self.process, step=self.step, status=self.status, reason=self.reason )) return submission