Michael Hope
97f7adb535
All checks were successful
continuous-integration/drone/push Build is passing
201 lines
5.8 KiB
Python
201 lines
5.8 KiB
Python
import dataclasses
|
|
import datetime
|
|
import enum
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import re
|
|
import threading
|
|
from typing import Any, Dict, Optional, Union
|
|
|
|
import paho.mqtt.client as mqtt
|
|
import pint
|
|
|
|
from . import Device, Publisher
|
|
|
|
logger = logging.getLogger('janet.mqtt')
|
|
_ureg = pint.get_application_registry()
|
|
|
|
_DEVICE_CLASSES = {
|
|
_ureg.amp: 'current',
|
|
_ureg.volt: 'voltage',
|
|
_ureg.watt: 'power',
|
|
_ureg.degC: 'temperature',
|
|
_ureg.watt_hour: 'energy',
|
|
}
|
|
|
|
_UNITS = {
|
|
_ureg.amp: 'A',
|
|
_ureg.volt: 'V',
|
|
_ureg.watt: 'W',
|
|
_ureg.degC: '°C',
|
|
_ureg.watt_hour: 'Wh',
|
|
_ureg.second: 's',
|
|
_ureg.hertz: 'Hz',
|
|
_ureg.hour: 'h',
|
|
}
|
|
|
|
|
|
def _to_state(value: Any) -> Union[float, str, None]:
|
|
if isinstance(value, pint.Quantity):
|
|
return round(value.m, 6)
|
|
elif isinstance(value, datetime.datetime):
|
|
return value.isoformat()
|
|
elif isinstance(value, enum.IntEnum):
|
|
return value.name
|
|
elif isinstance(value, bool):
|
|
return 'ON' if value else 'OFF'
|
|
elif isinstance(value, str):
|
|
return value
|
|
elif isinstance(value, int):
|
|
return value
|
|
elif isinstance(value, float):
|
|
return round(value, 6)
|
|
else:
|
|
return None
|
|
|
|
|
|
def _to_device_class(value) -> Optional[str]:
|
|
if isinstance(value, pint.Quantity):
|
|
for unit, klass in _DEVICE_CLASSES.items():
|
|
if unit == value.units:
|
|
return klass
|
|
logger.debug(f'No device class for units {value.units}')
|
|
return None
|
|
elif isinstance(value, datetime.datetime):
|
|
return 'timestamp'
|
|
else:
|
|
return None
|
|
|
|
|
|
def _to_units(value) -> Optional[str]:
|
|
if isinstance(value, pint.Quantity):
|
|
for unit, klass in _UNITS.items():
|
|
if unit == value.units:
|
|
return klass
|
|
logger.info(f'Unrecognised units {value.units}')
|
|
return None
|
|
elif isinstance(value, datetime.datetime):
|
|
return 's'
|
|
else:
|
|
return None
|
|
|
|
|
|
def _uid(entity, device: Device) -> str:
|
|
cls = entity.__class__
|
|
kind = device.kind if device else None
|
|
kind = kind or cls.__module__
|
|
id0 = device.identifiers[0] if len(device.identifiers) >= 1 else None
|
|
if id0 and re.match(r'^\w+$', id0):
|
|
uid = id0
|
|
else:
|
|
uid = hashlib.sha1(str(device.identifiers).encode()).hexdigest()[:13]
|
|
|
|
if entity == device:
|
|
return f'{kind}_{uid}'
|
|
else:
|
|
return f'{kind}_{uid}/{cls.__name__}'.lower().replace('.', '_')
|
|
|
|
|
|
def _device(device: Device) -> dict:
|
|
dev = {
|
|
'identifiers': device.identifiers,
|
|
'name': device.name,
|
|
}
|
|
if device.manufacturer:
|
|
dev['manufacturer'] = device.manufacturer
|
|
if device.model:
|
|
dev['model'] = device.model
|
|
if device.sw_version:
|
|
dev['sw_version'] = device.sw_version
|
|
|
|
return dev
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Setter:
|
|
field: dataclasses.Field
|
|
callback: Any
|
|
|
|
|
|
class Client(Publisher):
|
|
def __init__(self, host: str, port: int = 1883):
|
|
self._lock = threading.Lock()
|
|
self._setters: Dict[str, Setter] = {}
|
|
|
|
self._client = mqtt.Client()
|
|
self._client.on_message = self._on_message
|
|
|
|
self._client.connect_async(host, port, 60)
|
|
self._client.loop_start()
|
|
|
|
def _on_message(self, unused_client, unused_userdata,
|
|
message: mqtt.MQTTMessage):
|
|
with self._lock:
|
|
setter = self._setters.get(message.topic, None)
|
|
if setter is not None:
|
|
setter.callback(setter.field, message.payload)
|
|
|
|
def publish(self, device: Device, entity: Any, setter=None):
|
|
uid = _uid(entity, device)
|
|
base = f'janet/{uid}'
|
|
state_topic = f'{base}/state'
|
|
states = {
|
|
name: _to_state(value)
|
|
for name, value in dataclasses.asdict(entity).items()
|
|
if _to_state(value) is not None
|
|
}
|
|
self._client.publish(state_topic, json.dumps(states))
|
|
|
|
available = getattr(device, 'available', None)
|
|
if available is not None:
|
|
self._client.publish(f'{base}/power',
|
|
'ON' if available else 'OFF',
|
|
retain=True)
|
|
|
|
for field in dataclasses.fields(entity):
|
|
value = getattr(entity, field.name)
|
|
if value is None:
|
|
continue
|
|
state = _to_state(value)
|
|
if state is None:
|
|
logger.warning(
|
|
f'Ignoring field {field.name} type {field.type}')
|
|
continue
|
|
|
|
topic = f'{base}/{field.name}'
|
|
if field.type in (bool, Optional[bool]):
|
|
config_topic = (
|
|
f'homeassistant/binary_sensor/{uid}_{field.name}/config')
|
|
else:
|
|
config_topic = (
|
|
f'homeassistant/sensor/{uid}_{field.name}/config')
|
|
|
|
dev = _device(device)
|
|
kind = entity.__class__.__name__
|
|
name = field.name.replace('_', ' ')
|
|
|
|
config = {
|
|
'name': f'{kind} {name}',
|
|
'state_topic': state_topic,
|
|
'value_template': f'{{{{value_json.{field.name}}}}}',
|
|
'unique_id': f'{uid}_{field.name}',
|
|
'device': dev,
|
|
'expire_after': 600,
|
|
}
|
|
device_class = _to_device_class(value)
|
|
if device_class:
|
|
config['device_class'] = device_class
|
|
|
|
units = _to_units(value)
|
|
if units:
|
|
config['unit_of_measurement'] = units
|
|
|
|
if setter:
|
|
command = f'{topic}/set'
|
|
with self._lock:
|
|
if command not in self._setters:
|
|
self._client.subscribe(command)
|
|
self._setters[command] = Setter(field, setter)
|
|
self._client.publish(config_topic, json.dumps(config))
|