260 lines
6.4 KiB
Python
260 lines
6.4 KiB
Python
from __future__ import annotations
|
||
|
||
import abc
|
||
import dataclasses
|
||
import enum
|
||
import logging
|
||
import struct
|
||
from typing import Iterator, List, Sequence, Union, Type, Optional
|
||
|
||
import pint
|
||
|
||
logger = logging.getLogger('vedirect.hex')
|
||
|
||
_CR = ord('\n')
|
||
_COLON = ord(':')
|
||
|
||
_NIBBLES = {ord('0') + x: x for x in range(0, 10)}
|
||
_NIBBLES.update({ord('A') + x: 10 + x for x in range(0, 6)})
|
||
|
||
|
||
class Mode(enum.IntEnum):
|
||
UNSET = 0
|
||
R = 1
|
||
W = 2
|
||
RW = 3
|
||
|
||
|
||
@dataclasses.dataclass
|
||
class Register:
|
||
id: int
|
||
name: str
|
||
kind: str = ''
|
||
mode: Optional[Mode] = None
|
||
scale: Optional[float] = None
|
||
units: Union[str, Type[bool], Type[enum.Enum], None] = None
|
||
|
||
|
||
def Field(reg: Register) -> dataclasses.Field:
|
||
return dataclasses.field(default=dataclasses.MISSING,
|
||
metadata={'vedirect.Register': reg})
|
||
|
||
|
||
class Command(enum.IntEnum):
|
||
# 0x51FA51FA51FA51FA51FA as payload will enable bootloader mode.
|
||
ENTER_BOOT = 0
|
||
# Check for presence, the response is an ‘Rsp ping’ containing version and
|
||
# firmware type.
|
||
PING = 1
|
||
# Returns the version of the firmware as stored in the header in an ‘Rsp
|
||
# Done’ message.
|
||
VERSION = 3
|
||
# Returns the Product Id of the firmware as stored in the header in an
|
||
# ‘Rsp,
|
||
PRODUCT_ID = 4
|
||
# Restarts the device, no response is sent.
|
||
RESTART = 6
|
||
# Returns a get response with the requested data or error is returned.
|
||
# Arguments are ID of the value to get (uint16) and flags (uint8, must be
|
||
# zero).
|
||
GET = 7
|
||
# Sets a value. Arguments are the value ID (uint16), flags (uint8, must be
|
||
# zero), and a type specific value.
|
||
SET = 8
|
||
# Asynchronous data message. Should not be replied. Arguments are flags
|
||
# (uint8) and a type specific value.
|
||
ASYNC = 0xA
|
||
|
||
|
||
class Response(enum.IntEnum):
|
||
DONE = 1
|
||
UNKNOWN = 3
|
||
ERROR = 4
|
||
PING = 5
|
||
GET = 7
|
||
SET = 8
|
||
ASYNC = 0xA
|
||
|
||
|
||
class Flags(enum.IntFlag):
|
||
OK = 0
|
||
UNKNOWN_ID = 1
|
||
NOT_SUPPORTED = 2
|
||
PARAMETER_ERROR = 4
|
||
|
||
|
||
class Port(abc.ABC):
|
||
@abc.abstractmethod
|
||
def write(self, data: bytes) -> None:
|
||
pass
|
||
|
||
@property
|
||
@abc.abstractmethod
|
||
def in_waiting(self) -> int:
|
||
return 0
|
||
|
||
@abc.abstractmethod
|
||
def read(self, want: int) -> bytes:
|
||
return b''
|
||
|
||
|
||
class ProtocolError(RuntimeError):
|
||
pass
|
||
|
||
|
||
def _reader(src: Port) -> Iterator[int]:
|
||
"""Generates bytes from a serial port."""
|
||
while True:
|
||
ready = max(1, src.in_waiting)
|
||
yield from src.read(ready)
|
||
|
||
|
||
def _get_line(src: Port) -> Iterator[Sequence[int]]:
|
||
"""Read a line from a port with None on idle."""
|
||
r = _reader(src)
|
||
|
||
while True:
|
||
while True:
|
||
ch = next(r)
|
||
if ch == _COLON:
|
||
break
|
||
|
||
line: List[int] = []
|
||
while True:
|
||
ch = next(r)
|
||
if ch == _CR:
|
||
yield line
|
||
break
|
||
elif ch == _COLON:
|
||
pass
|
||
else:
|
||
nibble = _NIBBLES.get(ch, None)
|
||
if nibble is None:
|
||
logger.warning(
|
||
f'Invalid character {ch!r} in {line!r}, dropping')
|
||
break
|
||
|
||
line.append(nibble)
|
||
|
||
|
||
@dataclasses.dataclass(frozen=True)
|
||
class Frame:
|
||
command: int
|
||
data: bytes
|
||
|
||
|
||
def decoder(src: Port) -> Iterator[Frame]:
|
||
"""Reads frames from the port yielding None on idle."""
|
||
for line in _get_line(src):
|
||
if len(line) <= 3:
|
||
logger.warning(f'Line is too short {line!r}')
|
||
continue
|
||
|
||
if len(line) % 2 != 1:
|
||
logger.warning(
|
||
f'Line should be an odd number of characters: {line!r}')
|
||
continue
|
||
|
||
command = Response(line[0])
|
||
data = bytes(
|
||
(line[x] << 4) + line[x + 1] for x in range(1, len(line), 2))
|
||
chk = (command + sum(data)) & 0xFF
|
||
if chk != 0x55:
|
||
logger.error(f'Checksum error sum={chk:x}')
|
||
continue
|
||
|
||
data = data[:-1]
|
||
yield Frame(command, data)
|
||
|
||
|
||
def get_uint32(data, offset: int) -> int:
|
||
return get_uint16(data, offset) | (get_uint16(data, offset + 2) << 16)
|
||
|
||
|
||
def get_uint16(data, offset: int) -> int:
|
||
return (data[offset + 1] << 8) | data[offset + 0]
|
||
|
||
|
||
def get_uint8(data, offset: int) -> int:
|
||
return data[offset]
|
||
|
||
|
||
class Packer:
|
||
def __init__(self):
|
||
self.payload = []
|
||
|
||
def add(self, ch: Union[int, bytes]) -> Packer:
|
||
if isinstance(ch, bytes):
|
||
self.payload.extend(ch)
|
||
else:
|
||
self.payload.append(ch)
|
||
return self
|
||
|
||
def add_uint16(self, v: int) -> Packer:
|
||
return self.add(v & 0xFF).add(v >> 8)
|
||
|
||
def add_uint32(self, v: int) -> Packer:
|
||
return self.add_uint16(v & 0xFFFF).add_uint16(v >> 16)
|
||
|
||
def sum(self) -> int:
|
||
return sum(self.payload)
|
||
|
||
|
||
def _issubclass(c, klass):
|
||
try:
|
||
return issubclass(c, klass)
|
||
except TypeError:
|
||
return False
|
||
|
||
|
||
def unpack(register, value: bytes) -> Union[str, float, int]:
|
||
"""Unpacks a register value."""
|
||
if register.kind == 'S':
|
||
data = value.decode('LATIN-1')
|
||
# Might be null-terminated.
|
||
return data.split('\0')[0]
|
||
|
||
data = struct.unpack_from('<' + register.kind, value, 0)[0]
|
||
|
||
if register.scale is not None:
|
||
data *= register.scale
|
||
data = round(data, 6)
|
||
|
||
if register.units is not None:
|
||
if _issubclass(register.units, enum.Enum):
|
||
data = register.units(int(data))
|
||
elif register.units in (bool, ):
|
||
data = register.units(data)
|
||
elif isinstance(register.units, pint.Unit):
|
||
data = pint.Quantity(data, register.units)
|
||
|
||
return data
|
||
|
||
|
||
def _send(port: Port, packer: Packer):
|
||
packer.add((0x55 - packer.sum()) & 0xFF)
|
||
payload = packer.payload
|
||
out = f':{payload[0]:X}' + ''.join('%02X' % x for x in payload[1:])
|
||
port.write(out.encode('LATIN-1') + b'\n')
|
||
|
||
|
||
def set(port: Port, register: int, value: bytes):
|
||
packer = Packer().add(
|
||
Command.SET.value).add_uint16(register).add(0).add(value)
|
||
_send(port, packer)
|
||
|
||
|
||
def get(port: Port, register: int) -> None:
|
||
packer = Packer().add(Command.GET.value).add_uint16(register).add(0)
|
||
_send(port, packer)
|
||
|
||
|
||
def get_version(port: Port) -> None:
|
||
packer = Packer().add(Command.VERSION.value)
|
||
_send(port, packer)
|
||
|
||
|
||
def get_product_id(port: Port) -> None:
|
||
packer = Packer().add(Command.PRODUCT_ID.value)
|
||
_send(port, packer)
|