janet/janet/mqtt.py

201 lines
5.8 KiB
Python
Raw Permalink Normal View History

import dataclasses
import datetime
import enum
2021-02-14 17:31:28 +01:00
import hashlib
import json
import logging
import re
2021-02-26 16:46:54 +01:00
import threading
from typing import Any, Dict, Optional, Union
2021-02-14 17:31:28 +01:00
import paho.mqtt.client as mqtt
import pint
2021-02-14 17:31:28 +01:00
2021-02-26 16:46:54 +01:00
from . import Device, Publisher
2021-02-14 17:31:28 +01:00
logger = logging.getLogger('janet.mqtt')
_ureg = pint.get_application_registry()
_DEVICE_CLASSES = {
_ureg.amp: 'current',
_ureg.volt: 'voltage',
_ureg.watt: 'power',
2021-02-25 11:46:02 +01:00
_ureg.degC: 'temperature',
_ureg.watt_hour: 'energy',
}
2021-02-27 10:31:51 +01:00
_UNITS = {
_ureg.amp: 'A',
_ureg.volt: 'V',
_ureg.watt: 'W',
_ureg.degC: '°C',
_ureg.watt_hour: 'Wh',
2021-03-21 11:09:47 +01:00
_ureg.second: 's',
_ureg.hertz: 'Hz',
2021-03-21 13:42:25 +01:00
_ureg.hour: 'h',
2021-02-27 10:31:51 +01:00
}
2021-02-25 12:39:25 +01:00
def _to_state(value: Any) -> Union[float, str, None]:
if isinstance(value, pint.Quantity):
return round(value.m, 6)
2021-02-25 11:46:02 +01:00
elif isinstance(value, datetime.datetime):
return value.isoformat()
elif isinstance(value, enum.IntEnum):
return value.name
2021-02-25 12:39:25 +01:00
elif isinstance(value, bool):
return 'ON' if value else 'OFF'
2021-02-25 11:46:02 +01:00
elif isinstance(value, str):
return value
2021-02-25 12:39:25 +01:00
elif isinstance(value, int):
return value
elif isinstance(value, float):
2021-02-25 11:46:02 +01:00
return round(value, 6)
2021-02-25 12:39:25 +01:00
else:
return None
def _to_device_class(value) -> Optional[str]:
if isinstance(value, pint.Quantity):
for unit, klass in _DEVICE_CLASSES.items():
if unit == value.units:
return klass
logger.debug(f'No device class for units {value.units}')
return None
2021-02-25 11:46:02 +01:00
elif isinstance(value, datetime.datetime):
return 'timestamp'
else:
return None
2021-02-14 17:31:28 +01:00
2021-02-27 10:31:51 +01:00
def _to_units(value) -> Optional[str]:
if isinstance(value, pint.Quantity):
for unit, klass in _UNITS.items():
if unit == value.units:
return klass
logger.info(f'Unrecognised units {value.units}')
return None
elif isinstance(value, datetime.datetime):
return 's'
else:
return None
2021-02-14 17:31:28 +01:00
def _uid(entity, device: Device) -> str:
cls = entity.__class__
kind = device.kind if device else None
kind = kind or cls.__module__
2021-02-27 10:31:51 +01:00
id0 = device.identifiers[0] if len(device.identifiers) >= 1 else None
if id0 and re.match(r'^\w+$', id0):
uid = id0
else:
uid = hashlib.sha1(str(device.identifiers).encode()).hexdigest()[:13]
2021-02-14 17:31:28 +01:00
if entity == device:
return f'{kind}_{uid}'
else:
return f'{kind}_{uid}/{cls.__name__}'.lower().replace('.', '_')
2021-02-25 11:46:02 +01:00
def _device(device: Device) -> dict:
dev = {
'identifiers': device.identifiers,
'name': device.name,
}
if device.manufacturer:
dev['manufacturer'] = device.manufacturer
if device.model:
dev['model'] = device.model
if device.sw_version:
dev['sw_version'] = device.sw_version
2021-02-25 11:46:02 +01:00
return dev
2021-02-26 16:46:54 +01:00
@dataclasses.dataclass
class Setter:
field: dataclasses.Field
callback: Any
class Client(Publisher):
2021-02-14 17:31:28 +01:00
def __init__(self, host: str, port: int = 1883):
2021-02-26 16:46:54 +01:00
self._lock = threading.Lock()
self._setters: Dict[str, Setter] = {}
2021-02-14 17:31:28 +01:00
self._client = mqtt.Client()
2021-02-26 16:46:54 +01:00
self._client.on_message = self._on_message
2021-02-14 17:31:28 +01:00
self._client.connect_async(host, port, 60)
self._client.loop_start()
2021-02-26 16:46:54 +01:00
def _on_message(self, unused_client, unused_userdata,
message: mqtt.MQTTMessage):
with self._lock:
setter = self._setters.get(message.topic, None)
if setter is not None:
setter.callback(setter.field, message.payload)
def publish(self, device: Device, entity: Any, setter=None):
2021-02-14 17:31:28 +01:00
uid = _uid(entity, device)
2021-02-25 11:46:02 +01:00
base = f'janet/{uid}'
state_topic = f'{base}/state'
states = {
name: _to_state(value)
for name, value in dataclasses.asdict(entity).items()
if _to_state(value) is not None
}
self._client.publish(state_topic, json.dumps(states))
2021-02-14 17:31:28 +01:00
available = getattr(device, 'available', None)
if available is not None:
2021-02-25 11:46:02 +01:00
self._client.publish(f'{base}/power',
2021-02-14 17:31:28 +01:00
'ON' if available else 'OFF',
retain=True)
for field in dataclasses.fields(entity):
value = getattr(entity, field.name)
if value is None:
continue
2021-02-25 11:46:02 +01:00
state = _to_state(value)
if state is None:
logger.warning(
f'Ignoring field {field.name} type {field.type}')
continue
2021-02-14 17:31:28 +01:00
2021-02-25 11:46:02 +01:00
topic = f'{base}/{field.name}'
2021-02-14 17:31:28 +01:00
if field.type in (bool, Optional[bool]):
2021-02-26 16:46:54 +01:00
config_topic = (
f'homeassistant/binary_sensor/{uid}_{field.name}/config')
2021-02-14 17:31:28 +01:00
else:
2021-02-26 16:46:54 +01:00
config_topic = (
f'homeassistant/sensor/{uid}_{field.name}/config')
2021-02-25 11:46:02 +01:00
dev = _device(device)
kind = entity.__class__.__name__
name = field.name.replace('_', ' ')
config = {
'name': f'{kind} {name}',
'state_topic': state_topic,
'value_template': f'{{{{value_json.{field.name}}}}}',
2021-02-25 11:46:02 +01:00
'unique_id': f'{uid}_{field.name}',
'device': dev,
'expire_after': 600,
}
device_class = _to_device_class(value)
if device_class:
config['device_class'] = device_class
2021-02-27 10:31:51 +01:00
units = _to_units(value)
if units:
config['unit_of_measurement'] = units
2021-02-26 16:46:54 +01:00
if setter:
command = f'{topic}/set'
with self._lock:
if command not in self._setters:
self._client.subscribe(command)
self._setters[command] = Setter(field, setter)
2021-02-25 11:46:02 +01:00
self._client.publish(config_topic, json.dumps(config))