Source code for arxiv.canonical.log.log

"""Provides the write log for the canonical record."""

import json
import os
from datetime import datetime
from typing import Iterable, Optional

from backports.datetime_fromisoformat import MonkeyPatch
from pytz import timezone

from .. import domain as D

MonkeyPatch.patch_fromisoformat()

Action = str
Outcome = str

SUCCEEDED: Outcome = 'SUCCESS'
FAILED: Outcome = 'FAILED'

DEREFERENCE: Action = 'DEREF'
READ: Action = 'READ'
WRITE: Action = 'WRITE'

ET = timezone('US/Eastern')


[docs]class LogEntry: timestamp: datetime """The time of the log entry.""" event_id: D.EventIdentifier """Identifier of the event being handled.""" # key: D.Key # """Specific key being handled.""" action: Action """Action being performed by the agent.""" state: Outcome """Outcome of the action.""" message: str """Additional unstructured information about the action.""" def __init__(self, timestamp: datetime, event_id: D.EventIdentifier, # key: D.Key, action: Action, state: Outcome, message: str) -> None: self.timestamp = timestamp self.event_id = event_id self.action = action self.state = state self.message = message
[docs] @classmethod def from_repr(cls, repr: str) -> 'LogEntry': data = json.loads(repr) return cls( timestamp=datetime.fromisoformat(data['timestamp']), # type: ignore ; pylint: disable=no-member event_id=D.EventIdentifier(data['event_id']), # key=D.Key(data['key']), action=data['action'], state=data['state'], message=data.get('message', '') )
def __repr__(self) -> str: return json.dumps({ 'timestamp': self.timestamp.isoformat(), 'event_id': self.event_id, 'action': self.action, 'state': self.state, 'message': self.message })
[docs]class Log: """Action log for a canonical agent.""" def __init__(self, path: str) -> None: """Initialize with a reader and writer.""" self.path = os.path.abspath(path) if not os.path.exists(self.path): raise RuntimeError(f'No such path: {self.path}') try: self._writer = open(self.current_log_path, 'a') self._reader = open(self.current_log_path, 'r') except Exception as e: raise RuntimeError(f'Could not open {self.path} for writing') @property def current_log_path(self) -> str: """The path to the current log file.""" return f'{self.path}/.{datetime.now(ET).date().isoformat()}.log'
[docs] def write(self, event_id: D.EventIdentifier, action: Action, state: Outcome, message: str) -> LogEntry: """Write a log entry.""" entry = LogEntry(datetime.now(ET), event_id, action, state, message) self._writer.write(f'{entry}\n') self._writer.flush() # So that the reader can see what's up. return entry
[docs] def log_success(self, event_id: D.EventIdentifier, # key: D.Key, action: Action, message: str = '') -> LogEntry: """Log a successful action.""" return self.write(event_id, action, SUCCEEDED, message)
[docs] def log_failure(self, event_id: D.EventIdentifier, # key: D.Key, action: Action, message: str = '') -> LogEntry: """Log a failed action.""" return self.write(event_id, action, FAILED, message)
[docs] def read_last_entry(self) -> LogEntry: """Read the last entry in the log.""" entry = LogEntry.from_repr(self._reader.readlines()[-1]) self._reader.seek(0) return entry
[docs] def read_last_succeeded(self) -> Optional[LogEntry]: """Read the last SUCCEEDED entry in the log.""" lines = self._reader.readlines() for i in range(1, len(lines)): entry = LogEntry.from_repr(lines[-i]) if entry.state == SUCCEEDED: return entry return None
[docs] def read_all(self) -> Iterable[LogEntry]: for line in self._reader.readlines(): yield LogEntry.from_repr(line)