janet/janet/mqtt.py
Michael Hope 97f7adb535
All checks were successful
continuous-integration/drone/push Build is passing
janet: switch the device class message to debug
2021-03-21 14:35:56 +01:00

201 lines
5.8 KiB
Python

import dataclasses
import datetime
import enum
import hashlib
import json
import logging
import re
import threading
from typing import Any, Dict, Optional, Union
import paho.mqtt.client as mqtt
import pint
from . import Device, Publisher
logger = logging.getLogger('janet.mqtt')
_ureg = pint.get_application_registry()
_DEVICE_CLASSES = {
_ureg.amp: 'current',
_ureg.volt: 'voltage',
_ureg.watt: 'power',
_ureg.degC: 'temperature',
_ureg.watt_hour: 'energy',
}
_UNITS = {
_ureg.amp: 'A',
_ureg.volt: 'V',
_ureg.watt: 'W',
_ureg.degC: '°C',
_ureg.watt_hour: 'Wh',
_ureg.second: 's',
_ureg.hertz: 'Hz',
_ureg.hour: 'h',
}
def _to_state(value: Any) -> Union[float, str, None]:
if isinstance(value, pint.Quantity):
return round(value.m, 6)
elif isinstance(value, datetime.datetime):
return value.isoformat()
elif isinstance(value, enum.IntEnum):
return value.name
elif isinstance(value, bool):
return 'ON' if value else 'OFF'
elif isinstance(value, str):
return value
elif isinstance(value, int):
return value
elif isinstance(value, float):
return round(value, 6)
else:
return None
def _to_device_class(value) -> Optional[str]:
if isinstance(value, pint.Quantity):
for unit, klass in _DEVICE_CLASSES.items():
if unit == value.units:
return klass
logger.debug(f'No device class for units {value.units}')
return None
elif isinstance(value, datetime.datetime):
return 'timestamp'
else:
return None
def _to_units(value) -> Optional[str]:
if isinstance(value, pint.Quantity):
for unit, klass in _UNITS.items():
if unit == value.units:
return klass
logger.info(f'Unrecognised units {value.units}')
return None
elif isinstance(value, datetime.datetime):
return 's'
else:
return None
def _uid(entity, device: Device) -> str:
cls = entity.__class__
kind = device.kind if device else None
kind = kind or cls.__module__
id0 = device.identifiers[0] if len(device.identifiers) >= 1 else None
if id0 and re.match(r'^\w+$', id0):
uid = id0
else:
uid = hashlib.sha1(str(device.identifiers).encode()).hexdigest()[:13]
if entity == device:
return f'{kind}_{uid}'
else:
return f'{kind}_{uid}/{cls.__name__}'.lower().replace('.', '_')
def _device(device: Device) -> dict:
dev = {
'identifiers': device.identifiers,
'name': device.name,
}
if device.manufacturer:
dev['manufacturer'] = device.manufacturer
if device.model:
dev['model'] = device.model
if device.sw_version:
dev['sw_version'] = device.sw_version
return dev
@dataclasses.dataclass
class Setter:
field: dataclasses.Field
callback: Any
class Client(Publisher):
def __init__(self, host: str, port: int = 1883):
self._lock = threading.Lock()
self._setters: Dict[str, Setter] = {}
self._client = mqtt.Client()
self._client.on_message = self._on_message
self._client.connect_async(host, port, 60)
self._client.loop_start()
def _on_message(self, unused_client, unused_userdata,
message: mqtt.MQTTMessage):
with self._lock:
setter = self._setters.get(message.topic, None)
if setter is not None:
setter.callback(setter.field, message.payload)
def publish(self, device: Device, entity: Any, setter=None):
uid = _uid(entity, device)
base = f'janet/{uid}'
state_topic = f'{base}/state'
states = {
name: _to_state(value)
for name, value in dataclasses.asdict(entity).items()
if _to_state(value) is not None
}
self._client.publish(state_topic, json.dumps(states))
available = getattr(device, 'available', None)
if available is not None:
self._client.publish(f'{base}/power',
'ON' if available else 'OFF',
retain=True)
for field in dataclasses.fields(entity):
value = getattr(entity, field.name)
if value is None:
continue
state = _to_state(value)
if state is None:
logger.warning(
f'Ignoring field {field.name} type {field.type}')
continue
topic = f'{base}/{field.name}'
if field.type in (bool, Optional[bool]):
config_topic = (
f'homeassistant/binary_sensor/{uid}_{field.name}/config')
else:
config_topic = (
f'homeassistant/sensor/{uid}_{field.name}/config')
dev = _device(device)
kind = entity.__class__.__name__
name = field.name.replace('_', ' ')
config = {
'name': f'{kind} {name}',
'state_topic': state_topic,
'value_template': f'{{{{value_json.{field.name}}}}}',
'unique_id': f'{uid}_{field.name}',
'device': dev,
'expire_after': 600,
}
device_class = _to_device_class(value)
if device_class:
config['device_class'] = device_class
units = _to_units(value)
if units:
config['unit_of_measurement'] = units
if setter:
command = f'{topic}/set'
with self._lock:
if command not in self._setters:
self._client.subscribe(command)
self._setters[command] = Setter(field, setter)
self._client.publish(config_topic, json.dumps(config))