vedirect/vedirect/hex.py

260 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import abc
import dataclasses
import enum
import logging
import struct
from typing import Iterator, List, Sequence, Union, Type, Optional
import pint
logger = logging.getLogger('vedirect.hex')
_CR = ord('\n')
_COLON = ord(':')
_NIBBLES = {ord('0') + x: x for x in range(0, 10)}
_NIBBLES.update({ord('A') + x: 10 + x for x in range(0, 6)})
class Mode(enum.IntEnum):
UNSET = 0
R = 1
W = 2
RW = 3
@dataclasses.dataclass
class Register:
id: int
name: str
kind: str = ''
mode: Optional[Mode] = None
scale: Optional[float] = None
units: Union[str, Type[bool], Type[enum.Enum], None] = None
def Field(reg: Register) -> dataclasses.Field:
return dataclasses.field(default=dataclasses.MISSING,
metadata={'vedirect.Register': reg})
class Command(enum.IntEnum):
# 0x51FA51FA51FA51FA51FA as payload will enable bootloader mode.
ENTER_BOOT = 0
# Check for presence, the response is an Rsp ping containing version and
# firmware type.
PING = 1
# Returns the version of the firmware as stored in the header in an Rsp
# Done message.
VERSION = 3
# Returns the Product Id of the firmware as stored in the header in an
# Rsp,
PRODUCT_ID = 4
# Restarts the device, no response is sent.
RESTART = 6
# Returns a get response with the requested data or error is returned.
# Arguments are ID of the value to get (uint16) and flags (uint8, must be
# zero).
GET = 7
# Sets a value. Arguments are the value ID (uint16), flags (uint8, must be
# zero), and a type specific value.
SET = 8
# Asynchronous data message. Should not be replied. Arguments are flags
# (uint8) and a type specific value.
ASYNC = 0xA
class Response(enum.IntEnum):
DONE = 1
UNKNOWN = 3
ERROR = 4
PING = 5
GET = 7
SET = 8
ASYNC = 0xA
class Flags(enum.IntFlag):
OK = 0
UNKNOWN_ID = 1
NOT_SUPPORTED = 2
PARAMETER_ERROR = 4
class Port(abc.ABC):
@abc.abstractmethod
def write(self, data: bytes) -> None:
pass
@property
@abc.abstractmethod
def in_waiting(self) -> int:
return 0
@abc.abstractmethod
def read(self, want: int) -> bytes:
return b''
class ProtocolError(RuntimeError):
pass
def _reader(src: Port) -> Iterator[int]:
"""Generates bytes from a serial port."""
while True:
ready = max(1, src.in_waiting)
yield from src.read(ready)
def _get_line(src: Port) -> Iterator[Sequence[int]]:
"""Read a line from a port with None on idle."""
r = _reader(src)
while True:
while True:
ch = next(r)
if ch == _COLON:
break
line: List[int] = []
while True:
ch = next(r)
if ch == _CR:
yield line
break
elif ch == _COLON:
pass
else:
nibble = _NIBBLES.get(ch, None)
if nibble is None:
logger.warning(
f'Invalid character {ch!r} in {line!r}, dropping')
break
line.append(nibble)
@dataclasses.dataclass(frozen=True)
class Frame:
command: int
data: bytes
def decoder(src: Port) -> Iterator[Frame]:
"""Reads frames from the port yielding None on idle."""
for line in _get_line(src):
if len(line) <= 3:
logger.warning(f'Line is too short {line!r}')
continue
if len(line) % 2 != 1:
logger.warning(
f'Line should be an odd number of characters: {line!r}')
continue
command = Response(line[0])
data = bytes(
(line[x] << 4) + line[x + 1] for x in range(1, len(line), 2))
chk = (command + sum(data)) & 0xFF
if chk != 0x55:
logger.error(f'Checksum error sum={chk:x}')
continue
data = data[:-1]
yield Frame(command, data)
def get_uint32(data, offset: int) -> int:
return get_uint16(data, offset) | (get_uint16(data, offset + 2) << 16)
def get_uint16(data, offset: int) -> int:
return (data[offset + 1] << 8) | data[offset + 0]
def get_uint8(data, offset: int) -> int:
return data[offset]
class Packer:
def __init__(self):
self.payload = []
def add(self, ch: Union[int, bytes]) -> Packer:
if isinstance(ch, bytes):
self.payload.extend(ch)
else:
self.payload.append(ch)
return self
def add_uint16(self, v: int) -> Packer:
return self.add(v & 0xFF).add(v >> 8)
def add_uint32(self, v: int) -> Packer:
return self.add_uint16(v & 0xFFFF).add_uint16(v >> 16)
def sum(self) -> int:
return sum(self.payload)
def _issubclass(c, klass):
try:
return issubclass(c, klass)
except TypeError:
return False
def unpack(register, value: bytes) -> Union[str, float, int]:
"""Unpacks a register value."""
if register.kind == 'S':
data = value.decode('LATIN-1')
# Might be null-terminated.
return data.split('\0')[0]
data = struct.unpack_from('<' + register.kind, value, 0)[0]
if register.scale is not None:
data *= register.scale
data = round(data, 6)
if register.units is not None:
if _issubclass(register.units, enum.Enum):
data = register.units(int(data))
elif register.units in (bool, ):
data = register.units(data)
elif isinstance(register.units, pint.Unit):
data = pint.Quantity(data, register.units)
return data
def _send(port: Port, packer: Packer):
packer.add((0x55 - packer.sum()) & 0xFF)
payload = packer.payload
out = f':{payload[0]:X}' + ''.join('%02X' % x for x in payload[1:])
port.write(out.encode('LATIN-1') + b'\n')
def set(port: Port, register: int, value: bytes):
packer = Packer().add(
Command.SET.value).add_uint16(register).add(0).add(value)
_send(port, packer)
def get(port: Port, register: int) -> None:
packer = Packer().add(Command.GET.value).add_uint16(register).add(0)
_send(port, packer)
def get_version(port: Port) -> None:
packer = Packer().add(Command.VERSION.value)
_send(port, packer)
def get_product_id(port: Port) -> None:
packer = Packer().add(Command.PRODUCT_ID.value)
_send(port, packer)