267 lines
8.1 KiB
Python
267 lines
8.1 KiB
Python
# Copyright 2020 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import dataclasses
|
|
import enum
|
|
import logging
|
|
import queue
|
|
import struct
|
|
import pprint
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from importlib import metadata
|
|
from typing import Any, List, Union
|
|
|
|
import click
|
|
import janet
|
|
import janet.mqtt
|
|
import janet.prometheus
|
|
import prometheus_client
|
|
import serial
|
|
import systemd.daemon
|
|
|
|
from . import hex, phoenix
|
|
from .hex import Register
|
|
from . import mppt
|
|
|
|
logger = logging.getLogger('vedirect.cli')
|
|
|
|
|
|
class Echo(janet.Publisher):
|
|
def publish(self, device: janet.Device, entity: Any, unused_setter):
|
|
pprint.pprint(dataclasses.asdict(entity))
|
|
|
|
|
|
@dataclass
|
|
class Frame:
|
|
received: int
|
|
|
|
|
|
@dataclass
|
|
class Get:
|
|
register: int
|
|
data: bytes
|
|
|
|
|
|
@dataclass
|
|
class Done:
|
|
data: bytes
|
|
|
|
|
|
def _issubclass(typ, cls) -> bool:
|
|
try:
|
|
return issubclass(typ, cls)
|
|
except TypeError:
|
|
return False
|
|
|
|
|
|
class Poller:
|
|
def __init__(self, port: serial.Serial):
|
|
self._port = port
|
|
|
|
self._rx: 'queue.Queue[Union[Get,Done]]' = queue.Queue(100)
|
|
self._notify = threading.Condition()
|
|
self._lock = threading.Lock()
|
|
self._received = 0
|
|
|
|
def _discover(self):
|
|
while True:
|
|
with self._lock:
|
|
hex.get_product_id(self._port)
|
|
try:
|
|
resp = self._rx.get(timeout=1)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
if not isinstance(resp, Done):
|
|
continue
|
|
pid = struct.unpack('<H', resp.data)[0]
|
|
|
|
for product in phoenix.PRODUCT_IDS:
|
|
if pid == product.id:
|
|
logger.info(f'Discovered a {product.name}')
|
|
return phoenix.GROUPS
|
|
|
|
if pid in mppt.PRODUCT_IDS:
|
|
logger.info('Discovered a %s' % mppt.PRODUCT_IDS[pid])
|
|
return mppt.GROUPS
|
|
|
|
raise RuntimeError(f'Unrecognised product ID {pid:X}')
|
|
|
|
def poll(self):
|
|
threading.Thread(target=self._getter).start()
|
|
|
|
groups = self._discover()
|
|
|
|
while True:
|
|
for group in groups:
|
|
values = {}
|
|
for field in dataclasses.fields(group):
|
|
register = field.metadata['vedirect.Register']
|
|
|
|
with self._lock:
|
|
hex.get(self._port, register.id)
|
|
|
|
while True:
|
|
try:
|
|
resp = self._rx.get(timeout=1)
|
|
if not isinstance(resp, Get):
|
|
continue
|
|
if resp.register == register.id:
|
|
values[field.name] = hex.unpack(
|
|
register, resp.data)
|
|
break
|
|
except queue.Empty:
|
|
logger.warning(
|
|
f'No response on {field.name}, skipping')
|
|
values[field.name] = None
|
|
break
|
|
yield group(**values)
|
|
|
|
yield Frame(self._received)
|
|
|
|
with self._notify:
|
|
self._notify.wait(60)
|
|
|
|
def set(self, field: dataclasses.Field, value: bytes) -> None:
|
|
value = value.decode('LATIN-1')
|
|
|
|
if _issubclass(field.type, enum.IntEnum):
|
|
try:
|
|
v = field.type[value.upper()]
|
|
except (KeyError, ValueError) as ex:
|
|
names = ' '.join(x.name.lower() for x in field.type)
|
|
logger.error(
|
|
f'Invalid enum value {value}, should be one of {names}',
|
|
exc_info=ex)
|
|
return
|
|
elif field.type in (float, int):
|
|
try:
|
|
v = float(value)
|
|
except ValueError as ex:
|
|
logger.warning(
|
|
(f'Value {value!r} for {field.name} is not a number, '
|
|
'dropping'),
|
|
exc_info=ex)
|
|
return
|
|
else:
|
|
logger.warning(f'Unsupported field type {field.type} '
|
|
f'for {field.name}, dropping')
|
|
return
|
|
|
|
if 'vedirect.Register' not in field.metadata:
|
|
logger.error(
|
|
f'Field {field.name} is not a MPPT register, dropping')
|
|
return
|
|
|
|
register: Register = field.metadata['vedirect.Register']
|
|
|
|
if register.scale is not None:
|
|
v /= register.scale
|
|
|
|
v = int(round(v))
|
|
logger.warning(f'set {field.name}={v}')
|
|
payload = struct.pack('<' + register.kind, v)
|
|
|
|
with self._lock:
|
|
hex.set(self._port, register.id, payload)
|
|
# TODO(michaelh): hack to give the set time to propagate.
|
|
time.sleep(0.5)
|
|
|
|
with self._notify:
|
|
self._notify.notify()
|
|
|
|
def _getter(self):
|
|
for frame in hex.decoder(self._port):
|
|
self._received += 1
|
|
if frame.command == hex.Response.GET:
|
|
self._get(frame)
|
|
elif frame.command == hex.Response.DONE:
|
|
self._done(frame)
|
|
elif frame.command == hex.Response.ASYNC:
|
|
pass
|
|
else:
|
|
logger.warning(f'Dropped {frame}')
|
|
|
|
def _get(self, frame):
|
|
register, flags = struct.unpack_from('<HB', frame.data, 0)
|
|
flags = hex.Flags(flags)
|
|
value = frame.data[3:]
|
|
if flags != hex.Flags.OK:
|
|
logger.warning(f'GET on {register:X} gave error {flags!r}')
|
|
return
|
|
self._rx.put(Get(register, value))
|
|
|
|
def _done(self, frame):
|
|
self._rx.put(Done(frame.data))
|
|
|
|
|
|
@click.command()
|
|
@click.option('--port',
|
|
type=str,
|
|
required=True,
|
|
help='Serial port connected to the controller')
|
|
@click.option('--prometheus-port',
|
|
type=int,
|
|
help='If supplied, export metrics on this port')
|
|
@click.option('--mqtt-host',
|
|
help='If supplied, export metrics to this MQTT host')
|
|
@click.option('--echo',
|
|
is_flag=True,
|
|
help='If supplied, echo metrics to stdout')
|
|
def app(port: str, prometheus_port: int, mqtt_host: str, echo: bool):
|
|
logging.basicConfig(format='%(name)-12s %(levelname)-8s %(message)s',
|
|
level=logging.INFO)
|
|
|
|
s = serial.serial_for_url(port, baudrate=19200, timeout=0.7)
|
|
publishers: List[janet.Publisher] = []
|
|
|
|
if prometheus_port:
|
|
prometheus_client.start_http_server(prometheus_port)
|
|
publishers.append(janet.prometheus.Client())
|
|
info = prometheus_client.Gauge('vedirect_info', 'Tool information',
|
|
('version', ))
|
|
info.labels(metadata.version('vedirect')).set(1)
|
|
|
|
if mqtt_host:
|
|
publishers.append(janet.mqtt.Client(mqtt_host))
|
|
|
|
if echo:
|
|
publishers.append(Echo())
|
|
|
|
device = None
|
|
poller = Poller(s)
|
|
for g in poller.poll():
|
|
if isinstance(g, phoenix.Information) or isinstance(
|
|
g, mppt.Information):
|
|
if not g.serial_number:
|
|
continue
|
|
|
|
device = janet.Device(identifiers=[g.serial_number],
|
|
available=True,
|
|
model=g.model_name,
|
|
name=f'VE.Direct {g.serial_number}',
|
|
manufacturer='Vicron',
|
|
kind='vedirect')
|
|
systemd.daemon.notify('WATCHDOG=1')
|
|
|
|
if device is not None:
|
|
for publisher in publishers:
|
|
publisher.publish(device, g, poller.set)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app()
|