import abc import logging from dataclasses import dataclass from typing import Any, Callable, Optional, Sequence logger = logging.getLogger('janet') @dataclass class Device: identifiers: Sequence[str] available: bool name: str manufacturer: Optional[str] = None model: Optional[str] = None sw_version: Optional[str] = None kind: Optional[str] = None class Channel: def __init__(self): self._listeners = set() def put(self, source: Device, entity: Any) -> None: remove = set() for listener in self._listeners: try: if listener(source, entity): remove.add(listener) except IOError as ex: logger.warning(f'Listener raised {ex}, removing', exc_info=ex) remove.add(listener) except Exception as ex: logger.error(f'Listener raised fatal exception {ex}, removing', exc_info=ex) remove.add(listener) self._listeners ^= remove def listen(self, listener: Callable[[Device, Any], None]) -> None: self._listeners.add(listener) class Publisher(abc.ABC): @abc.abstractmethod def publish(self, device: Device, entity: Any, setter: Optional[Callable] = None): pass