97 lines
3 KiB
Python
97 lines
3 KiB
Python
# Copyright 2020 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Exports fields over MQTT with discovery."""
|
|
|
|
import enum
|
|
import json
|
|
import time
|
|
from typing import Dict, Optional
|
|
|
|
import paho.mqtt.client as mqtt
|
|
import pint
|
|
|
|
from . import defs
|
|
|
|
_UNITS = {
|
|
'volt': ('V', 'voltage'),
|
|
'ampere': ('A', 'current'),
|
|
'watt': ('W', 'power'),
|
|
'hour * watt': ('Wh', 'energy'),
|
|
}
|
|
|
|
|
|
class Exporter:
|
|
def __init__(self, host: str, port: int = 1883):
|
|
self._last = None # type: Optional[float]
|
|
|
|
self._client = mqtt.Client()
|
|
self._client.connect_async(host, port, 60)
|
|
self._client.loop_start()
|
|
|
|
def _config(self, ser: str, fields: dict) -> None:
|
|
for label, value in fields.items():
|
|
f = defs.FIELD_MAP[label]
|
|
labelc = f.label.replace('#', '').lower()
|
|
device = {
|
|
'ids': [ser],
|
|
} # type: Dict[str, object]
|
|
if f == defs.PID:
|
|
device.update({
|
|
'manufacturer': 'Victron',
|
|
'model': defs.PIDS[int(value, 16)],
|
|
'name': f'Victron {ser}',
|
|
'sw_version': fields[defs.FW.label],
|
|
})
|
|
|
|
config = {
|
|
'name': f'Victron {label}',
|
|
'state_topic': f'tele/victron_{ser}/{labelc}',
|
|
'unique_id': f'victron_{ser}_{labelc}',
|
|
'expire_after': 600,
|
|
'device': device,
|
|
}
|
|
if isinstance(value, pint.Quantity):
|
|
unit = str(value.units)
|
|
unit, klass = _UNITS.get(unit, (unit, None))
|
|
config['unit_of_measurement'] = unit
|
|
if klass:
|
|
config['device_class'] = klass
|
|
|
|
self._client.publish(
|
|
f'homeassistant/sensor/victron_{ser}_{labelc}/config',
|
|
json.dumps(config),
|
|
retain=True)
|
|
|
|
def export(self, fields: dict) -> None:
|
|
ser = fields[defs.SER.label]
|
|
if self._last is None:
|
|
self._config(ser, fields)
|
|
elif (time.time() - self._last) < 60:
|
|
return
|
|
|
|
self._last = time.time()
|
|
|
|
for label, value in fields.items():
|
|
name = label.replace('#', '').lower()
|
|
topic = f'tele/victron_{ser}/{name}'
|
|
|
|
if isinstance(value, pint.Quantity):
|
|
payload = round(value.m, 3)
|
|
elif isinstance(value, enum.IntEnum):
|
|
payload = int(value)
|
|
else:
|
|
payload = str(value)
|
|
|
|
self._client.publish(topic, payload)
|