janet/janet/__init__.py

52 lines
1.3 KiB
Python

import abc
import logging
from dataclasses import dataclass
from typing import Any, Callable, Optional, Sequence
logger = logging.getLogger('janet')
@dataclass
class Device:
identifiers: Sequence[str]
available: bool
name: str
manufacturer: Optional[str] = None
model: Optional[str] = None
sw_version: Optional[str] = None
kind: Optional[str] = None
class Channel:
def __init__(self):
self._listeners = set()
def put(self, source: Device, entity: Any) -> None:
remove = set()
for listener in self._listeners:
try:
if listener(source, entity):
remove.add(listener)
except IOError as ex:
logger.warning(f'Listener raised {ex}, removing', exc_info=ex)
remove.add(listener)
except Exception as ex:
logger.error(f'Listener raised fatal exception {ex}, removing',
exc_info=ex)
remove.add(listener)
self._listeners ^= remove
def listen(self, listener: Callable[[Device, Any], None]) -> None:
self._listeners.add(listener)
class Publisher(abc.ABC):
@abc.abstractmethod
def publish(self,
device: Device,
entity: Any,
setter: Optional[Callable] = None):
pass