vedirect/vedirect/mqtt.py

97 lines
3 KiB
Python

# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exports fields over MQTT with discovery."""
import enum
import json
import time
from typing import Dict, Optional
import paho.mqtt.client as mqtt
import pint
from . import defs
_UNITS = {
'volt': ('V', 'voltage'),
'ampere': ('A', 'current'),
'watt': ('W', 'power'),
'hour * watt': ('Wh', 'energy'),
}
class Exporter:
def __init__(self, host: str, port: int = 1883):
self._last = None # type: Optional[float]
self._client = mqtt.Client()
self._client.connect_async(host, port, 60)
self._client.loop_start()
def _config(self, ser: str, fields: dict) -> None:
for label, value in fields.items():
f = defs.FIELD_MAP[label]
labelc = f.label.replace('#', '').lower()
device = {
'ids': [ser],
} # type: Dict[str, object]
if f == defs.PID:
device.update({
'manufacturer': 'Victron',
'model': defs.PIDS[int(value, 16)],
'name': f'Victron {ser}',
'sw_version': fields[defs.FW.label],
})
config = {
'name': f'Victron {label}',
'state_topic': f'tele/victron_{ser}/{labelc}',
'unique_id': f'victron_{ser}_{labelc}',
'expire_after': 600,
'device': device,
}
if isinstance(value, pint.Quantity):
unit = str(value.units)
unit, klass = _UNITS.get(unit, (unit, None))
config['unit_of_measurement'] = unit
if klass:
config['device_class'] = klass
self._client.publish(
f'homeassistant/sensor/victron_{ser}_{labelc}/config',
json.dumps(config),
retain=True)
def export(self, fields: dict) -> None:
ser = fields[defs.SER.label]
if self._last is None:
self._config(ser, fields)
elif (time.time() - self._last) < 60:
return
self._last = time.time()
for label, value in fields.items():
name = label.replace('#', '').lower()
topic = f'tele/victron_{ser}/{name}'
if isinstance(value, pint.Quantity):
payload = round(value.m, 3)
elif isinstance(value, enum.IntEnum):
payload = int(value)
else:
payload = str(value)
self._client.publish(topic, payload)