Source code for arxiv.canonical.role.stream
from abc import ABC
from typing import Any, List, Optional, Sequence
from .. import domain as D
from ..core import IEventStream, ICanonicalSource
from .proxy import EventStreamProxy
[docs]class StreamRole(ABC):
event_supported: List[str] = []
@property
def stream(self) -> IEventStream:
assert self._stream is not None
return self._stream
[docs] def set_stream(self, stream: IEventStream,
sources: Sequence[ICanonicalSource],
name: str = 'all') -> None:
self._stream = EventStreamProxy(stream, self.event_supported)
[docs]class NoStream(StreamRole, ABC):
pass
[docs]class Listener(StreamRole, ABC):
event_supported = ['listen']
[docs] def on_event(self, event: D.Event) -> None:
raise NotImplementedError('Must be implemented by a child class')
[docs]class Emitter(StreamRole, ABC):
event_supported = ['emit']