vedirect/vedirect/cli.py
Michael Hope 23c0a49a26
Some checks failed
continuous-integration/drone/push Build is failing
vedirect: use the systemd watchdog
2021-05-09 14:20:10 +02:00

267 lines
8.1 KiB
Python

# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import enum
import logging
import queue
import struct
import pprint
import threading
import time
from dataclasses import dataclass
from importlib import metadata
from typing import Any, List, Union
import click
import janet
import janet.mqtt
import janet.prometheus
import prometheus_client
import serial
import systemd.daemon
from . import hex, phoenix
from .hex import Register
from . import mppt
logger = logging.getLogger('vedirect.cli')
class Echo(janet.Publisher):
def publish(self, device: janet.Device, entity: Any, unused_setter):
pprint.pprint(dataclasses.asdict(entity))
@dataclass
class Frame:
received: int
@dataclass
class Get:
register: int
data: bytes
@dataclass
class Done:
data: bytes
def _issubclass(typ, cls) -> bool:
try:
return issubclass(typ, cls)
except TypeError:
return False
class Poller:
def __init__(self, port: serial.Serial):
self._port = port
self._rx: 'queue.Queue[Union[Get,Done]]' = queue.Queue(100)
self._notify = threading.Condition()
self._lock = threading.Lock()
self._received = 0
def _discover(self):
while True:
with self._lock:
hex.get_product_id(self._port)
try:
resp = self._rx.get(timeout=1)
except queue.Empty:
continue
if not isinstance(resp, Done):
continue
pid = struct.unpack('<H', resp.data)[0]
for product in phoenix.PRODUCT_IDS:
if pid == product.id:
logger.info(f'Discovered a {product.name}')
return phoenix.GROUPS
if pid in mppt.PRODUCT_IDS:
logger.info('Discovered a %s' % mppt.PRODUCT_IDS[pid])
return mppt.GROUPS
raise RuntimeError(f'Unrecognised product ID {pid:X}')
def poll(self):
threading.Thread(target=self._getter).start()
groups = self._discover()
while True:
for group in groups:
values = {}
for field in dataclasses.fields(group):
register = field.metadata['vedirect.Register']
with self._lock:
hex.get(self._port, register.id)
while True:
try:
resp = self._rx.get(timeout=1)
if not isinstance(resp, Get):
continue
if resp.register == register.id:
values[field.name] = hex.unpack(
register, resp.data)
break
except queue.Empty:
logger.warning(
f'No response on {field.name}, skipping')
values[field.name] = None
break
yield group(**values)
yield Frame(self._received)
with self._notify:
self._notify.wait(60)
def set(self, field: dataclasses.Field, value: bytes) -> None:
value = value.decode('LATIN-1')
if _issubclass(field.type, enum.IntEnum):
try:
v = field.type[value.upper()]
except (KeyError, ValueError) as ex:
names = ' '.join(x.name.lower() for x in field.type)
logger.error(
f'Invalid enum value {value}, should be one of {names}',
exc_info=ex)
return
elif field.type in (float, int):
try:
v = float(value)
except ValueError as ex:
logger.warning(
(f'Value {value!r} for {field.name} is not a number, '
'dropping'),
exc_info=ex)
return
else:
logger.warning(f'Unsupported field type {field.type} '
f'for {field.name}, dropping')
return
if 'vedirect.Register' not in field.metadata:
logger.error(
f'Field {field.name} is not a MPPT register, dropping')
return
register: Register = field.metadata['vedirect.Register']
if register.scale is not None:
v /= register.scale
v = int(round(v))
logger.warning(f'set {field.name}={v}')
payload = struct.pack('<' + register.kind, v)
with self._lock:
hex.set(self._port, register.id, payload)
# TODO(michaelh): hack to give the set time to propagate.
time.sleep(0.5)
with self._notify:
self._notify.notify()
def _getter(self):
for frame in hex.decoder(self._port):
self._received += 1
if frame.command == hex.Response.GET:
self._get(frame)
elif frame.command == hex.Response.DONE:
self._done(frame)
elif frame.command == hex.Response.ASYNC:
pass
else:
logger.warning(f'Dropped {frame}')
def _get(self, frame):
register, flags = struct.unpack_from('<HB', frame.data, 0)
flags = hex.Flags(flags)
value = frame.data[3:]
if flags != hex.Flags.OK:
logger.warning(f'GET on {register:X} gave error {flags!r}')
return
self._rx.put(Get(register, value))
def _done(self, frame):
self._rx.put(Done(frame.data))
@click.command()
@click.option('--port',
type=str,
required=True,
help='Serial port connected to the controller')
@click.option('--prometheus-port',
type=int,
help='If supplied, export metrics on this port')
@click.option('--mqtt-host',
help='If supplied, export metrics to this MQTT host')
@click.option('--echo',
is_flag=True,
help='If supplied, echo metrics to stdout')
def app(port: str, prometheus_port: int, mqtt_host: str, echo: bool):
logging.basicConfig(format='%(name)-12s %(levelname)-8s %(message)s',
level=logging.INFO)
s = serial.serial_for_url(port, baudrate=19200, timeout=0.7)
publishers: List[janet.Publisher] = []
if prometheus_port:
prometheus_client.start_http_server(prometheus_port)
publishers.append(janet.prometheus.Client())
info = prometheus_client.Gauge('vedirect_info', 'Tool information',
('version', ))
info.labels(metadata.version('vedirect')).set(1)
if mqtt_host:
publishers.append(janet.mqtt.Client(mqtt_host))
if echo:
publishers.append(Echo())
device = None
poller = Poller(s)
for g in poller.poll():
if isinstance(g, phoenix.Information) or isinstance(
g, mppt.Information):
if not g.serial_number:
continue
device = janet.Device(identifiers=[g.serial_number],
available=True,
model=g.model_name,
name=f'VE.Direct {g.serial_number}',
manufacturer='Vicron',
kind='vedirect')
systemd.daemon.notify('WATCHDOG=1')
if device is not None:
for publisher in publishers:
publisher.publish(device, g, poller.set)
if __name__ == '__main__':
app()