from __future__ import annotations import abc import dataclasses import enum import logging import struct from typing import Iterator, List, Sequence, Union, Type, Optional import pint logger = logging.getLogger('vedirect.hex') _CR = ord('\n') _COLON = ord(':') _NIBBLES = {ord('0') + x: x for x in range(0, 10)} _NIBBLES.update({ord('A') + x: 10 + x for x in range(0, 6)}) class Mode(enum.IntEnum): UNSET = 0 R = 1 W = 2 RW = 3 @dataclasses.dataclass class Register: id: int name: str kind: str = '' mode: Optional[Mode] = None scale: Optional[float] = None units: Union[str, Type[bool], Type[enum.Enum], None] = None def Field(reg: Register) -> dataclasses.Field: return dataclasses.field(default=dataclasses.MISSING, metadata={'vedirect.Register': reg}) class Command(enum.IntEnum): # 0x51FA51FA51FA51FA51FA as payload will enable bootloader mode. ENTER_BOOT = 0 # Check for presence, the response is an ‘Rsp ping’ containing version and # firmware type. PING = 1 # Returns the version of the firmware as stored in the header in an ‘Rsp # Done’ message. VERSION = 3 # Returns the Product Id of the firmware as stored in the header in an # ‘Rsp, PRODUCT_ID = 4 # Restarts the device, no response is sent. RESTART = 6 # Returns a get response with the requested data or error is returned. # Arguments are ID of the value to get (uint16) and flags (uint8, must be # zero). GET = 7 # Sets a value. Arguments are the value ID (uint16), flags (uint8, must be # zero), and a type specific value. SET = 8 # Asynchronous data message. Should not be replied. Arguments are flags # (uint8) and a type specific value. ASYNC = 0xA class Response(enum.IntEnum): DONE = 1 UNKNOWN = 3 ERROR = 4 PING = 5 GET = 7 SET = 8 ASYNC = 0xA class Flags(enum.IntFlag): OK = 0 UNKNOWN_ID = 1 NOT_SUPPORTED = 2 PARAMETER_ERROR = 4 class Port(abc.ABC): @abc.abstractmethod def write(self, data: bytes) -> None: pass @property @abc.abstractmethod def in_waiting(self) -> int: return 0 @abc.abstractmethod def read(self, want: int) -> bytes: return b'' class ProtocolError(RuntimeError): pass def _reader(src: Port) -> Iterator[int]: """Generates bytes from a serial port.""" while True: ready = max(1, src.in_waiting) yield from src.read(ready) def _get_line(src: Port) -> Iterator[Sequence[int]]: """Read a line from a port with None on idle.""" r = _reader(src) while True: while True: ch = next(r) if ch == _COLON: break line: List[int] = [] while True: ch = next(r) if ch == _CR: yield line break elif ch == _COLON: pass else: nibble = _NIBBLES.get(ch, None) if nibble is None: logger.warning( f'Invalid character {ch!r} in {line!r}, dropping') break line.append(nibble) @dataclasses.dataclass(frozen=True) class Frame: command: int data: bytes def decoder(src: Port) -> Iterator[Frame]: """Reads frames from the port yielding None on idle.""" for line in _get_line(src): if len(line) <= 3: logger.warning(f'Line is too short {line!r}') continue if len(line) % 2 != 1: logger.warning( f'Line should be an odd number of characters: {line!r}') continue command = Response(line[0]) data = bytes( (line[x] << 4) + line[x + 1] for x in range(1, len(line), 2)) chk = (command + sum(data)) & 0xFF if chk != 0x55: logger.error(f'Checksum error sum={chk:x}') continue data = data[:-1] yield Frame(command, data) def get_uint32(data, offset: int) -> int: return get_uint16(data, offset) | (get_uint16(data, offset + 2) << 16) def get_uint16(data, offset: int) -> int: return (data[offset + 1] << 8) | data[offset + 0] def get_uint8(data, offset: int) -> int: return data[offset] class Packer: def __init__(self): self.payload = [] def add(self, ch: Union[int, bytes]) -> Packer: if isinstance(ch, bytes): self.payload.extend(ch) else: self.payload.append(ch) return self def add_uint16(self, v: int) -> Packer: return self.add(v & 0xFF).add(v >> 8) def add_uint32(self, v: int) -> Packer: return self.add_uint16(v & 0xFFFF).add_uint16(v >> 16) def sum(self) -> int: return sum(self.payload) def _issubclass(c, klass): try: return issubclass(c, klass) except TypeError: return False def unpack(register, value: bytes) -> Union[str, float, int]: """Unpacks a register value.""" if register.kind == 'S': data = value.decode('LATIN-1') # Might be null-terminated. return data.split('\0')[0] data = struct.unpack_from('<' + register.kind, value, 0)[0] if register.scale is not None: data *= register.scale data = round(data, 6) if register.units is not None: if _issubclass(register.units, enum.Enum): data = register.units(int(data)) elif register.units in (bool, ): data = register.units(data) elif isinstance(register.units, pint.Unit): data = pint.Quantity(data, register.units) return data def _send(port: Port, packer: Packer): packer.add((0x55 - packer.sum()) & 0xFF) payload = packer.payload out = f':{payload[0]:X}' + ''.join('%02X' % x for x in payload[1:]) port.write(out.encode('LATIN-1') + b'\n') def set(port: Port, register: int, value: bytes): packer = Packer().add( Command.SET.value).add_uint16(register).add(0).add(value) _send(port, packer) def get(port: Port, register: int) -> None: packer = Packer().add(Command.GET.value).add_uint16(register).add(0) _send(port, packer) def get_version(port: Port) -> None: packer = Packer().add(Command.VERSION.value) _send(port, packer) def get_product_id(port: Port) -> None: packer = Packer().add(Command.PRODUCT_ID.value) _send(port, packer)