Source code for arxiv.canonical.role.stream


from abc import ABC
from typing import Any, List, Optional, Sequence

from .. import domain as D
from ..core import IEventStream, ICanonicalSource

from .proxy import EventStreamProxy


[docs]class StreamRole(ABC): event_supported: List[str] = [] @property def stream(self) -> IEventStream: assert self._stream is not None return self._stream
[docs] def set_stream(self, stream: IEventStream, sources: Sequence[ICanonicalSource], name: str = 'all') -> None: self._stream = EventStreamProxy(stream, self.event_supported)
[docs]class NoStream(StreamRole, ABC): pass
[docs]class Listener(StreamRole, ABC): event_supported = ['listen']
[docs] def on_event(self, event: D.Event) -> None: raise NotImplementedError('Must be implemented by a child class')
[docs]class Emitter(StreamRole, ABC): event_supported = ['emit']