52 lines
1.3 KiB
Python
52 lines
1.3 KiB
Python
import abc
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from typing import Any, Callable, Optional, Sequence
|
|
|
|
logger = logging.getLogger('janet')
|
|
|
|
|
|
@dataclass
|
|
class Device:
|
|
identifiers: Sequence[str]
|
|
available: bool
|
|
|
|
name: str
|
|
manufacturer: Optional[str] = None
|
|
model: Optional[str] = None
|
|
sw_version: Optional[str] = None
|
|
kind: Optional[str] = None
|
|
|
|
|
|
class Channel:
|
|
def __init__(self):
|
|
self._listeners = set()
|
|
|
|
def put(self, source: Device, entity: Any) -> None:
|
|
remove = set()
|
|
for listener in self._listeners:
|
|
try:
|
|
if listener(source, entity):
|
|
remove.add(listener)
|
|
except IOError as ex:
|
|
logger.warning(f'Listener raised {ex}, removing', exc_info=ex)
|
|
remove.add(listener)
|
|
except Exception as ex:
|
|
logger.error(f'Listener raised fatal exception {ex}, removing',
|
|
exc_info=ex)
|
|
remove.add(listener)
|
|
|
|
self._listeners ^= remove
|
|
|
|
def listen(self, listener: Callable[[Device, Any], None]) -> None:
|
|
self._listeners.add(listener)
|
|
|
|
|
|
class Publisher(abc.ABC):
|
|
@abc.abstractmethod
|
|
def publish(self,
|
|
device: Device,
|
|
entity: Any,
|
|
setter: Optional[Callable] = None):
|
|
pass
|