2021-02-25 11:45:08 +01:00
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import abc
|
|
|
|
|
import dataclasses
|
|
|
|
|
import enum
|
|
|
|
|
import logging
|
2021-03-07 20:12:34 +01:00
|
|
|
|
import struct
|
2021-03-21 13:40:58 +01:00
|
|
|
|
from typing import Iterator, List, Sequence, Union, Type, Optional
|
2021-02-25 11:45:08 +01:00
|
|
|
|
|
2021-03-07 20:12:34 +01:00
|
|
|
|
import pint
|
|
|
|
|
|
2021-02-25 11:45:08 +01:00
|
|
|
|
logger = logging.getLogger('vedirect.hex')
|
|
|
|
|
|
|
|
|
|
_CR = ord('\n')
|
|
|
|
|
_COLON = ord(':')
|
|
|
|
|
|
|
|
|
|
_NIBBLES = {ord('0') + x: x for x in range(0, 10)}
|
|
|
|
|
_NIBBLES.update({ord('A') + x: 10 + x for x in range(0, 6)})
|
|
|
|
|
|
|
|
|
|
|
2021-03-21 13:40:58 +01:00
|
|
|
|
class Mode(enum.IntEnum):
|
|
|
|
|
UNSET = 0
|
|
|
|
|
R = 1
|
|
|
|
|
W = 2
|
|
|
|
|
RW = 3
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclasses.dataclass
|
|
|
|
|
class Register:
|
|
|
|
|
id: int
|
|
|
|
|
name: str
|
|
|
|
|
kind: str = ''
|
|
|
|
|
mode: Optional[Mode] = None
|
|
|
|
|
scale: Optional[float] = None
|
|
|
|
|
units: Union[str, Type[bool], Type[enum.Enum], None] = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def Field(reg: Register) -> dataclasses.Field:
|
|
|
|
|
return dataclasses.field(default=dataclasses.MISSING,
|
|
|
|
|
metadata={'vedirect.Register': reg})
|
|
|
|
|
|
|
|
|
|
|
2021-02-25 11:45:08 +01:00
|
|
|
|
class Command(enum.IntEnum):
|
|
|
|
|
# 0x51FA51FA51FA51FA51FA as payload will enable bootloader mode.
|
|
|
|
|
ENTER_BOOT = 0
|
2021-02-26 17:53:33 +01:00
|
|
|
|
# Check for presence, the response is an ‘Rsp ping’ containing version and
|
|
|
|
|
# firmware type.
|
2021-02-25 11:45:08 +01:00
|
|
|
|
PING = 1
|
2021-02-26 17:53:33 +01:00
|
|
|
|
# Returns the version of the firmware as stored in the header in an ‘Rsp
|
|
|
|
|
# Done’ message.
|
2021-02-25 11:45:08 +01:00
|
|
|
|
VERSION = 3
|
2021-02-26 17:53:33 +01:00
|
|
|
|
# Returns the Product Id of the firmware as stored in the header in an
|
|
|
|
|
# ‘Rsp,
|
2021-02-25 11:45:08 +01:00
|
|
|
|
PRODUCT_ID = 4
|
|
|
|
|
# Restarts the device, no response is sent.
|
|
|
|
|
RESTART = 6
|
2021-02-26 17:53:33 +01:00
|
|
|
|
# Returns a get response with the requested data or error is returned.
|
|
|
|
|
# Arguments are ID of the value to get (uint16) and flags (uint8, must be
|
|
|
|
|
# zero).
|
2021-02-25 11:45:08 +01:00
|
|
|
|
GET = 7
|
2021-02-26 17:53:33 +01:00
|
|
|
|
# Sets a value. Arguments are the value ID (uint16), flags (uint8, must be
|
|
|
|
|
# zero), and a type specific value.
|
2021-02-25 11:45:08 +01:00
|
|
|
|
SET = 8
|
2021-02-26 17:53:33 +01:00
|
|
|
|
# Asynchronous data message. Should not be replied. Arguments are flags
|
|
|
|
|
# (uint8) and a type specific value.
|
2021-02-25 11:45:08 +01:00
|
|
|
|
ASYNC = 0xA
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Response(enum.IntEnum):
|
|
|
|
|
DONE = 1
|
|
|
|
|
UNKNOWN = 3
|
|
|
|
|
ERROR = 4
|
|
|
|
|
PING = 5
|
|
|
|
|
GET = 7
|
|
|
|
|
SET = 8
|
|
|
|
|
ASYNC = 0xA
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Flags(enum.IntFlag):
|
|
|
|
|
OK = 0
|
|
|
|
|
UNKNOWN_ID = 1
|
|
|
|
|
NOT_SUPPORTED = 2
|
|
|
|
|
PARAMETER_ERROR = 4
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Port(abc.ABC):
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
|
def write(self, data: bytes) -> None:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
|
def in_waiting(self) -> int:
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
|
def read(self, want: int) -> bytes:
|
|
|
|
|
return b''
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ProtocolError(RuntimeError):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _reader(src: Port) -> Iterator[int]:
|
|
|
|
|
"""Generates bytes from a serial port."""
|
|
|
|
|
while True:
|
|
|
|
|
ready = max(1, src.in_waiting)
|
|
|
|
|
yield from src.read(ready)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_line(src: Port) -> Iterator[Sequence[int]]:
|
|
|
|
|
"""Read a line from a port with None on idle."""
|
|
|
|
|
r = _reader(src)
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
while True:
|
|
|
|
|
ch = next(r)
|
|
|
|
|
if ch == _COLON:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
line: List[int] = []
|
|
|
|
|
while True:
|
|
|
|
|
ch = next(r)
|
|
|
|
|
if ch == _CR:
|
|
|
|
|
yield line
|
|
|
|
|
break
|
|
|
|
|
elif ch == _COLON:
|
|
|
|
|
pass
|
|
|
|
|
else:
|
|
|
|
|
nibble = _NIBBLES.get(ch, None)
|
|
|
|
|
if nibble is None:
|
2021-03-07 20:12:34 +01:00
|
|
|
|
logger.warning(
|
|
|
|
|
f'Invalid character {ch!r} in {line!r}, dropping')
|
2021-02-26 18:54:38 +01:00
|
|
|
|
break
|
2021-02-25 11:45:08 +01:00
|
|
|
|
|
|
|
|
|
line.append(nibble)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
|
|
|
class Frame:
|
|
|
|
|
command: int
|
|
|
|
|
data: bytes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def decoder(src: Port) -> Iterator[Frame]:
|
|
|
|
|
"""Reads frames from the port yielding None on idle."""
|
|
|
|
|
for line in _get_line(src):
|
|
|
|
|
if len(line) <= 3:
|
|
|
|
|
logger.warning(f'Line is too short {line!r}')
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if len(line) % 2 != 1:
|
|
|
|
|
logger.warning(
|
|
|
|
|
f'Line should be an odd number of characters: {line!r}')
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
command = Response(line[0])
|
|
|
|
|
data = bytes(
|
|
|
|
|
(line[x] << 4) + line[x + 1] for x in range(1, len(line), 2))
|
|
|
|
|
chk = (command + sum(data)) & 0xFF
|
|
|
|
|
if chk != 0x55:
|
|
|
|
|
logger.error(f'Checksum error sum={chk:x}')
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
data = data[:-1]
|
|
|
|
|
yield Frame(command, data)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_uint32(data, offset: int) -> int:
|
|
|
|
|
return get_uint16(data, offset) | (get_uint16(data, offset + 2) << 16)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_uint16(data, offset: int) -> int:
|
|
|
|
|
return (data[offset + 1] << 8) | data[offset + 0]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_uint8(data, offset: int) -> int:
|
|
|
|
|
return data[offset]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Packer:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.payload = []
|
|
|
|
|
|
|
|
|
|
def add(self, ch: Union[int, bytes]) -> Packer:
|
|
|
|
|
if isinstance(ch, bytes):
|
2021-02-26 17:45:44 +01:00
|
|
|
|
self.payload.extend(ch)
|
2021-02-25 11:45:08 +01:00
|
|
|
|
else:
|
|
|
|
|
self.payload.append(ch)
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
def add_uint16(self, v: int) -> Packer:
|
|
|
|
|
return self.add(v & 0xFF).add(v >> 8)
|
|
|
|
|
|
|
|
|
|
def add_uint32(self, v: int) -> Packer:
|
|
|
|
|
return self.add_uint16(v & 0xFFFF).add_uint16(v >> 16)
|
|
|
|
|
|
|
|
|
|
def sum(self) -> int:
|
|
|
|
|
return sum(self.payload)
|
|
|
|
|
|
|
|
|
|
|
2021-03-07 20:12:34 +01:00
|
|
|
|
def _issubclass(c, klass):
|
|
|
|
|
try:
|
|
|
|
|
return issubclass(c, klass)
|
|
|
|
|
except TypeError:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def unpack(register, value: bytes) -> Union[str, float, int]:
|
|
|
|
|
"""Unpacks a register value."""
|
|
|
|
|
if register.kind == 'S':
|
|
|
|
|
data = value.decode('LATIN-1')
|
|
|
|
|
# Might be null-terminated.
|
|
|
|
|
return data.split('\0')[0]
|
|
|
|
|
|
|
|
|
|
data = struct.unpack_from('<' + register.kind, value, 0)[0]
|
|
|
|
|
|
|
|
|
|
if register.scale is not None:
|
|
|
|
|
data *= register.scale
|
|
|
|
|
data = round(data, 6)
|
|
|
|
|
|
|
|
|
|
if register.units is not None:
|
|
|
|
|
if _issubclass(register.units, enum.Enum):
|
|
|
|
|
data = register.units(int(data))
|
|
|
|
|
elif register.units in (bool, ):
|
|
|
|
|
data = register.units(data)
|
|
|
|
|
elif isinstance(register.units, pint.Unit):
|
|
|
|
|
data = pint.Quantity(data, register.units)
|
|
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
2021-02-25 11:45:08 +01:00
|
|
|
|
def _send(port: Port, packer: Packer):
|
|
|
|
|
packer.add((0x55 - packer.sum()) & 0xFF)
|
|
|
|
|
payload = packer.payload
|
|
|
|
|
out = f':{payload[0]:X}' + ''.join('%02X' % x for x in payload[1:])
|
|
|
|
|
port.write(out.encode('LATIN-1') + b'\n')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set(port: Port, register: int, value: bytes):
|
|
|
|
|
packer = Packer().add(
|
|
|
|
|
Command.SET.value).add_uint16(register).add(0).add(value)
|
|
|
|
|
_send(port, packer)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get(port: Port, register: int) -> None:
|
|
|
|
|
packer = Packer().add(Command.GET.value).add_uint16(register).add(0)
|
|
|
|
|
_send(port, packer)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_version(port: Port) -> None:
|
|
|
|
|
packer = Packer().add(Command.VERSION.value)
|
|
|
|
|
_send(port, packer)
|
2021-03-21 13:40:58 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_product_id(port: Port) -> None:
|
|
|
|
|
packer = Packer().add(Command.PRODUCT_ID.value)
|
|
|
|
|
_send(port, packer)
|