vedirect/vedirect/hex.py

260 lines
6.4 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import abc
import dataclasses
import enum
import logging
2021-03-07 20:12:34 +01:00
import struct
from typing import Iterator, List, Sequence, Union, Type, Optional
2021-03-07 20:12:34 +01:00
import pint
logger = logging.getLogger('vedirect.hex')
_CR = ord('\n')
_COLON = ord(':')
_NIBBLES = {ord('0') + x: x for x in range(0, 10)}
_NIBBLES.update({ord('A') + x: 10 + x for x in range(0, 6)})
class Mode(enum.IntEnum):
UNSET = 0
R = 1
W = 2
RW = 3
@dataclasses.dataclass
class Register:
id: int
name: str
kind: str = ''
mode: Optional[Mode] = None
scale: Optional[float] = None
units: Union[str, Type[bool], Type[enum.Enum], None] = None
def Field(reg: Register) -> dataclasses.Field:
return dataclasses.field(default=dataclasses.MISSING,
metadata={'vedirect.Register': reg})
class Command(enum.IntEnum):
# 0x51FA51FA51FA51FA51FA as payload will enable bootloader mode.
ENTER_BOOT = 0
# Check for presence, the response is an Rsp ping containing version and
# firmware type.
PING = 1
# Returns the version of the firmware as stored in the header in an Rsp
# Done message.
VERSION = 3
# Returns the Product Id of the firmware as stored in the header in an
# Rsp,
PRODUCT_ID = 4
# Restarts the device, no response is sent.
RESTART = 6
# Returns a get response with the requested data or error is returned.
# Arguments are ID of the value to get (uint16) and flags (uint8, must be
# zero).
GET = 7
# Sets a value. Arguments are the value ID (uint16), flags (uint8, must be
# zero), and a type specific value.
SET = 8
# Asynchronous data message. Should not be replied. Arguments are flags
# (uint8) and a type specific value.
ASYNC = 0xA
class Response(enum.IntEnum):
DONE = 1
UNKNOWN = 3
ERROR = 4
PING = 5
GET = 7
SET = 8
ASYNC = 0xA
class Flags(enum.IntFlag):
OK = 0
UNKNOWN_ID = 1
NOT_SUPPORTED = 2
PARAMETER_ERROR = 4
class Port(abc.ABC):
@abc.abstractmethod
def write(self, data: bytes) -> None:
pass
@property
@abc.abstractmethod
def in_waiting(self) -> int:
return 0
@abc.abstractmethod
def read(self, want: int) -> bytes:
return b''
class ProtocolError(RuntimeError):
pass
def _reader(src: Port) -> Iterator[int]:
"""Generates bytes from a serial port."""
while True:
ready = max(1, src.in_waiting)
yield from src.read(ready)
def _get_line(src: Port) -> Iterator[Sequence[int]]:
"""Read a line from a port with None on idle."""
r = _reader(src)
while True:
while True:
ch = next(r)
if ch == _COLON:
break
line: List[int] = []
while True:
ch = next(r)
if ch == _CR:
yield line
break
elif ch == _COLON:
pass
else:
nibble = _NIBBLES.get(ch, None)
if nibble is None:
2021-03-07 20:12:34 +01:00
logger.warning(
f'Invalid character {ch!r} in {line!r}, dropping')
break
line.append(nibble)
@dataclasses.dataclass(frozen=True)
class Frame:
command: int
data: bytes
def decoder(src: Port) -> Iterator[Frame]:
"""Reads frames from the port yielding None on idle."""
for line in _get_line(src):
if len(line) <= 3:
logger.warning(f'Line is too short {line!r}')
continue
if len(line) % 2 != 1:
logger.warning(
f'Line should be an odd number of characters: {line!r}')
continue
command = Response(line[0])
data = bytes(
(line[x] << 4) + line[x + 1] for x in range(1, len(line), 2))
chk = (command + sum(data)) & 0xFF
if chk != 0x55:
logger.error(f'Checksum error sum={chk:x}')
continue
data = data[:-1]
yield Frame(command, data)
def get_uint32(data, offset: int) -> int:
return get_uint16(data, offset) | (get_uint16(data, offset + 2) << 16)
def get_uint16(data, offset: int) -> int:
return (data[offset + 1] << 8) | data[offset + 0]
def get_uint8(data, offset: int) -> int:
return data[offset]
class Packer:
def __init__(self):
self.payload = []
def add(self, ch: Union[int, bytes]) -> Packer:
if isinstance(ch, bytes):
self.payload.extend(ch)
else:
self.payload.append(ch)
return self
def add_uint16(self, v: int) -> Packer:
return self.add(v & 0xFF).add(v >> 8)
def add_uint32(self, v: int) -> Packer:
return self.add_uint16(v & 0xFFFF).add_uint16(v >> 16)
def sum(self) -> int:
return sum(self.payload)
2021-03-07 20:12:34 +01:00
def _issubclass(c, klass):
try:
return issubclass(c, klass)
except TypeError:
return False
def unpack(register, value: bytes) -> Union[str, float, int]:
"""Unpacks a register value."""
if register.kind == 'S':
data = value.decode('LATIN-1')
# Might be null-terminated.
return data.split('\0')[0]
data = struct.unpack_from('<' + register.kind, value, 0)[0]
if register.scale is not None:
data *= register.scale
data = round(data, 6)
if register.units is not None:
if _issubclass(register.units, enum.Enum):
data = register.units(int(data))
elif register.units in (bool, ):
data = register.units(data)
elif isinstance(register.units, pint.Unit):
data = pint.Quantity(data, register.units)
return data
def _send(port: Port, packer: Packer):
packer.add((0x55 - packer.sum()) & 0xFF)
payload = packer.payload
out = f':{payload[0]:X}' + ''.join('%02X' % x for x in payload[1:])
port.write(out.encode('LATIN-1') + b'\n')
def set(port: Port, register: int, value: bytes):
packer = Packer().add(
Command.SET.value).add_uint16(register).add(0).add(value)
_send(port, packer)
def get(port: Port, register: int) -> None:
packer = Packer().add(Command.GET.value).add_uint16(register).add(0)
_send(port, packer)
def get_version(port: Port) -> None:
packer = Packer().add(Command.VERSION.value)
_send(port, packer)
def get_product_id(port: Port) -> None:
packer = Packer().add(Command.PRODUCT_ID.value)
_send(port, packer)