# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import dataclasses import enum import logging import queue import struct import pprint import threading import time from dataclasses import dataclass from importlib import metadata from typing import Any, List, Union import click import janet import janet.mqtt import janet.prometheus import prometheus_client import serial import systemd.daemon from . import hex, phoenix from .hex import Register from . import mppt logger = logging.getLogger('vedirect.cli') class Echo(janet.Publisher): def publish(self, device: janet.Device, entity: Any, unused_setter): pprint.pprint(dataclasses.asdict(entity)) @dataclass class Frame: received: int @dataclass class Get: register: int data: bytes @dataclass class Done: data: bytes def _issubclass(typ, cls) -> bool: try: return issubclass(typ, cls) except TypeError: return False class Poller: def __init__(self, port: serial.Serial): self._port = port self._rx: 'queue.Queue[Union[Get,Done]]' = queue.Queue(100) self._notify = threading.Condition() self._lock = threading.Lock() self._received = 0 def _discover(self): while True: with self._lock: hex.get_product_id(self._port) try: resp = self._rx.get(timeout=1) except queue.Empty: continue if not isinstance(resp, Done): continue pid = struct.unpack(' None: value = value.decode('LATIN-1') if _issubclass(field.type, enum.IntEnum): try: v = field.type[value.upper()] except (KeyError, ValueError) as ex: names = ' '.join(x.name.lower() for x in field.type) logger.error( f'Invalid enum value {value}, should be one of {names}', exc_info=ex) return elif field.type in (float, int): try: v = float(value) except ValueError as ex: logger.warning( (f'Value {value!r} for {field.name} is not a number, ' 'dropping'), exc_info=ex) return else: logger.warning(f'Unsupported field type {field.type} ' f'for {field.name}, dropping') return if 'vedirect.Register' not in field.metadata: logger.error( f'Field {field.name} is not a MPPT register, dropping') return register: Register = field.metadata['vedirect.Register'] if register.scale is not None: v /= register.scale v = int(round(v)) logger.warning(f'set {field.name}={v}') payload = struct.pack('<' + register.kind, v) with self._lock: hex.set(self._port, register.id, payload) # TODO(michaelh): hack to give the set time to propagate. time.sleep(0.5) with self._notify: self._notify.notify() def _getter(self): for frame in hex.decoder(self._port): self._received += 1 if frame.command == hex.Response.GET: self._get(frame) elif frame.command == hex.Response.DONE: self._done(frame) elif frame.command == hex.Response.ASYNC: pass else: logger.warning(f'Dropped {frame}') def _get(self, frame): register, flags = struct.unpack_from('