vedirect/vedirect/cli.py

267 lines
8.1 KiB
Python
Raw Permalink Normal View History

# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import enum
import logging
2021-03-07 20:12:34 +01:00
import queue
import struct
import pprint
2021-03-07 20:12:34 +01:00
import threading
import time
2021-03-07 20:12:34 +01:00
from dataclasses import dataclass
from importlib import metadata
from typing import Any, List, Union
2021-03-07 20:12:34 +01:00
import click
import janet
import janet.mqtt
import janet.prometheus
2021-03-07 20:12:34 +01:00
import prometheus_client
import serial
2021-05-09 14:20:10 +02:00
import systemd.daemon
from . import hex, phoenix
from .hex import Register
from . import mppt
2021-03-07 20:12:34 +01:00
logger = logging.getLogger('vedirect.cli')
2021-03-07 20:12:34 +01:00
class Echo(janet.Publisher):
def publish(self, device: janet.Device, entity: Any, unused_setter):
pprint.pprint(dataclasses.asdict(entity))
2021-03-21 16:30:39 +01:00
@dataclass
class Frame:
received: int
@dataclass
class Get:
register: int
data: bytes
@dataclass
class Done:
data: bytes
def _issubclass(typ, cls) -> bool:
try:
return issubclass(typ, cls)
except TypeError:
return False
class Poller:
def __init__(self, port: serial.Serial):
self._port = port
2021-03-21 16:30:39 +01:00
self._rx: 'queue.Queue[Union[Get,Done]]' = queue.Queue(100)
self._notify = threading.Condition()
self._lock = threading.Lock()
2021-03-21 16:30:39 +01:00
self._received = 0
def _discover(self):
while True:
with self._lock:
hex.get_product_id(self._port)
try:
resp = self._rx.get(timeout=1)
except queue.Empty:
continue
if not isinstance(resp, Done):
continue
pid = struct.unpack('<H', resp.data)[0]
for product in phoenix.PRODUCT_IDS:
if pid == product.id:
logger.info(f'Discovered a {product.name}')
return phoenix.GROUPS
if pid in mppt.PRODUCT_IDS:
logger.info('Discovered a %s' % mppt.PRODUCT_IDS[pid])
return mppt.GROUPS
raise RuntimeError(f'Unrecognised product ID {pid:X}')
def poll(self):
threading.Thread(target=self._getter).start()
groups = self._discover()
while True:
for group in groups:
values = {}
for field in dataclasses.fields(group):
register = field.metadata['vedirect.Register']
with self._lock:
hex.get(self._port, register.id)
while True:
try:
resp = self._rx.get(timeout=1)
2021-05-09 14:20:10 +02:00
if not isinstance(resp, Get):
continue
if resp.register == register.id:
values[field.name] = hex.unpack(
register, resp.data)
break
except queue.Empty:
logger.warning(
f'No response on {field.name}, skipping')
values[field.name] = None
break
yield group(**values)
2021-03-21 16:30:39 +01:00
yield Frame(self._received)
with self._notify:
self._notify.wait(60)
def set(self, field: dataclasses.Field, value: bytes) -> None:
value = value.decode('LATIN-1')
if _issubclass(field.type, enum.IntEnum):
try:
v = field.type[value.upper()]
except (KeyError, ValueError) as ex:
names = ' '.join(x.name.lower() for x in field.type)
logger.error(
f'Invalid enum value {value}, should be one of {names}',
exc_info=ex)
return
elif field.type in (float, int):
try:
v = float(value)
except ValueError as ex:
logger.warning(
2021-03-21 16:30:39 +01:00
(f'Value {value!r} for {field.name} is not a number, '
'dropping'),
exc_info=ex)
return
else:
logger.warning(f'Unsupported field type {field.type} '
f'for {field.name}, dropping')
return
if 'vedirect.Register' not in field.metadata:
logger.error(
f'Field {field.name} is not a MPPT register, dropping')
return
register: Register = field.metadata['vedirect.Register']
if register.scale is not None:
v /= register.scale
v = int(round(v))
logger.warning(f'set {field.name}={v}')
payload = struct.pack('<' + register.kind, v)
with self._lock:
hex.set(self._port, register.id, payload)
# TODO(michaelh): hack to give the set time to propagate.
time.sleep(0.5)
with self._notify:
self._notify.notify()
def _getter(self):
for frame in hex.decoder(self._port):
2021-03-21 16:30:39 +01:00
self._received += 1
if frame.command == hex.Response.GET:
self._get(frame)
elif frame.command == hex.Response.DONE:
self._done(frame)
elif frame.command == hex.Response.ASYNC:
pass
else:
logger.warning(f'Dropped {frame}')
def _get(self, frame):
register, flags = struct.unpack_from('<HB', frame.data, 0)
flags = hex.Flags(flags)
value = frame.data[3:]
if flags != hex.Flags.OK:
logger.warning(f'GET on {register:X} gave error {flags!r}')
return
self._rx.put(Get(register, value))
def _done(self, frame):
self._rx.put(Done(frame.data))
@click.command()
@click.option('--port',
type=str,
required=True,
help='Serial port connected to the controller')
@click.option('--prometheus-port',
type=int,
help='If supplied, export metrics on this port')
@click.option('--mqtt-host',
help='If supplied, export metrics to this MQTT host')
@click.option('--echo',
is_flag=True,
help='If supplied, echo metrics to stdout')
def app(port: str, prometheus_port: int, mqtt_host: str, echo: bool):
2021-05-09 14:20:10 +02:00
logging.basicConfig(format='%(name)-12s %(levelname)-8s %(message)s',
level=logging.INFO)
s = serial.serial_for_url(port, baudrate=19200, timeout=0.7)
2021-03-07 20:12:34 +01:00
publishers: List[janet.Publisher] = []
if prometheus_port:
prometheus_client.start_http_server(prometheus_port)
publishers.append(janet.prometheus.Client())
info = prometheus_client.Gauge('vedirect_info', 'Tool information',
('version', ))
info.labels(metadata.version('vedirect')).set(1)
if mqtt_host:
publishers.append(janet.mqtt.Client(mqtt_host))
if echo:
publishers.append(Echo())
device = None
poller = Poller(s)
for g in poller.poll():
if isinstance(g, phoenix.Information) or isinstance(
g, mppt.Information):
if not g.serial_number:
continue
device = janet.Device(identifiers=[g.serial_number],
available=True,
model=g.model_name,
name=f'VE.Direct {g.serial_number}',
manufacturer='Vicron',
kind='vedirect')
2021-05-09 14:20:10 +02:00
systemd.daemon.notify('WATCHDOG=1')
if device is not None:
for publisher in publishers:
publisher.publish(device, g, poller.set)
if __name__ == '__main__':
app()