import dataclasses import datetime import enum import hashlib import json import logging import re import threading from typing import Any, Dict, Optional, Union import paho.mqtt.client as mqtt import pint from . import Device, Publisher logger = logging.getLogger('janet.mqtt') _ureg = pint.get_application_registry() _DEVICE_CLASSES = { _ureg.amp: 'current', _ureg.volt: 'voltage', _ureg.watt: 'power', _ureg.degC: 'temperature', _ureg.watt_hour: 'energy', } _UNITS = { _ureg.amp: 'A', _ureg.volt: 'V', _ureg.watt: 'W', _ureg.degC: '°C', _ureg.watt_hour: 'Wh', _ureg.second: 's', _ureg.hertz: 'Hz', _ureg.hour: 'h', } def _to_state(value: Any) -> Union[float, str, None]: if isinstance(value, pint.Quantity): return round(value.m, 6) elif isinstance(value, datetime.datetime): return value.isoformat() elif isinstance(value, enum.IntEnum): return value.name elif isinstance(value, bool): return 'ON' if value else 'OFF' elif isinstance(value, str): return value elif isinstance(value, int): return value elif isinstance(value, float): return round(value, 6) else: return None def _to_device_class(value) -> Optional[str]: if isinstance(value, pint.Quantity): for unit, klass in _DEVICE_CLASSES.items(): if unit == value.units: return klass logger.debug(f'No device class for units {value.units}') return None elif isinstance(value, datetime.datetime): return 'timestamp' else: return None def _to_units(value) -> Optional[str]: if isinstance(value, pint.Quantity): for unit, klass in _UNITS.items(): if unit == value.units: return klass logger.info(f'Unrecognised units {value.units}') return None elif isinstance(value, datetime.datetime): return 's' else: return None def _uid(entity, device: Device) -> str: cls = entity.__class__ kind = device.kind if device else None kind = kind or cls.__module__ id0 = device.identifiers[0] if len(device.identifiers) >= 1 else None if id0 and re.match(r'^\w+$', id0): uid = id0 else: uid = hashlib.sha1(str(device.identifiers).encode()).hexdigest()[:13] if entity == device: return f'{kind}_{uid}' else: return f'{kind}_{uid}/{cls.__name__}'.lower().replace('.', '_') def _device(device: Device) -> dict: dev = { 'identifiers': device.identifiers, 'name': device.name, } if device.manufacturer: dev['manufacturer'] = device.manufacturer if device.model: dev['model'] = device.model if device.sw_version: dev['sw_version'] = device.sw_version return dev @dataclasses.dataclass class Setter: field: dataclasses.Field callback: Any class Client(Publisher): def __init__(self, host: str, port: int = 1883): self._lock = threading.Lock() self._setters: Dict[str, Setter] = {} self._client = mqtt.Client() self._client.on_message = self._on_message self._client.connect_async(host, port, 60) self._client.loop_start() def _on_message(self, unused_client, unused_userdata, message: mqtt.MQTTMessage): with self._lock: setter = self._setters.get(message.topic, None) if setter is not None: setter.callback(setter.field, message.payload) def publish(self, device: Device, entity: Any, setter=None): uid = _uid(entity, device) base = f'janet/{uid}' state_topic = f'{base}/state' states = { name: _to_state(value) for name, value in dataclasses.asdict(entity).items() if _to_state(value) is not None } self._client.publish(state_topic, json.dumps(states)) available = getattr(device, 'available', None) if available is not None: self._client.publish(f'{base}/power', 'ON' if available else 'OFF', retain=True) for field in dataclasses.fields(entity): value = getattr(entity, field.name) if value is None: continue state = _to_state(value) if state is None: logger.warning( f'Ignoring field {field.name} type {field.type}') continue topic = f'{base}/{field.name}' if field.type in (bool, Optional[bool]): config_topic = ( f'homeassistant/binary_sensor/{uid}_{field.name}/config') else: config_topic = ( f'homeassistant/sensor/{uid}_{field.name}/config') dev = _device(device) kind = entity.__class__.__name__ name = field.name.replace('_', ' ') config = { 'name': f'{kind} {name}', 'state_topic': state_topic, 'value_template': f'{{{{value_json.{field.name}}}}}', 'unique_id': f'{uid}_{field.name}', 'device': dev, 'expire_after': 600, } device_class = _to_device_class(value) if device_class: config['device_class'] = device_class units = _to_units(value) if units: config['unit_of_measurement'] = units if setter: command = f'{topic}/set' with self._lock: if command not in self._setters: self._client.subscribe(command) self._setters[command] = Setter(field, setter) self._client.publish(config_topic, json.dumps(config))