nppilot: added an initial MSP decoder

master
Michael Hope 4 years ago
parent d5f5ac7413
commit ea18ed0913

@ -0,0 +1,268 @@
import dataclasses
import enum
import struct
from typing import List
import serial
class uint8(int):
def size():
return 'B'
class uint16(int):
def size():
return 'H'
class int16(int):
def size():
return 'h'
class decidegrees(float):
def size():
return 'h'
def scale(v):
return v * 0.1
class uint16list(list):
def consume(p) -> (bytes, list):
out = []
while len(p) >= 2:
out.append(struct.unpack('<H', p[:2])[0])
p = p[2:]
return p, out
class uint32(int):
def size():
return 'I'
class Message:
def encode(self) -> bytes:
values = []
fmt = '<'
for f in dataclasses.fields(self):
fmt += f.type.__dict__['size']()
values.append(getattr(self, f.name))
return struct.pack(fmt, *values)
def decode(self, payload):
p = bytes(payload)
for f in dataclasses.fields(self):
if 'consume' in f.type.__dict__:
p, v = f.type.__dict__['consume'](p)
setattr(self, f.name, v)
else:
fmt = '<' + f.type.__dict__['size']()
want = struct.calcsize(fmt)
v = struct.unpack(fmt, p[:want])[0]
if 'scale' in f.type.__dict__:
v = f.type.__dict__['scale'](v)
else:
v = f.type(v)
setattr(self, f.name, v)
p = p[want:]
if len(p):
print('dropped remaining %d' % len(p))
return self
@dataclasses.dataclass
class Ident(Message):
version: uint8 = 0
multitype: uint8 = 0
msp_version: uint8 = 0
capability: uint32 = 0
def command(self):
return 100
class ArmingDisabledFlag(enum.IntFlag):
NO_GYRO = 1 << 0
FAILSAFE = 1 << 1
RX_FAILSAFE = 1 << 2
BAD_RX_RECOVERY = 1 << 3
BOXFAILSAFE = 1 << 4
RUNAWAY_TAKEOFF = 1 << 5
THROTTLE = 1 << 6
ANGLE = 1 << 7
BOOT_GRACE_TIME = 1 << 8
NOPREARM = 1 << 9
LOAD = 1 << 10
CALIBRATING = 1 << 11
CLI = 1 << 12
CMS_MENU = 1 << 13
OSD_MENU = 1 << 14
BST = 1 << 15
MSP = 1 << 16
PARALYZE = 1 << 17
GPS = 1 << 18
ARM_SWITCH = 1 << 19
def size():
return 'I'
class SensorsFlag(enum.IntFlag):
ACC = 1 << 0
BARO = 1 << 1
MAG = 1 << 2
GPS = 1 << 3
RANGEFINDER = 1 << 4
GYRO = 1 << 5
def size():
return 'H'
class FlightModeFlag(enum.IntFlag):
NONE = 0
ANGLE = 1 << 0
HORIZON = 1 << 1
MAG = 1 << 2
BARO = 1 << 3
GPS_HOME = 1 << 4
GPS_HOLD = 1 << 5
HEADFREE = 1 << 6
PASSTHRU = 1 << 8
FAILSAFE = 1 << 10
GPS_RESCUE = 1 << 11
def size():
return 'I'
@dataclasses.dataclass
class Status(Message):
cycle_time: uint16 = 0
i2c_errors_count: uint16 = 0
sensors: SensorsFlag = 0
mode: FlightModeFlag = 0
profile: uint8 = 0
system_load_percent: uint16 = 0
gyro_cycle_time: uint16 = 0
flags_len: uint8 = 0
# flags
arming_disable_len: uint8 = 0
arming_disable_flags: ArmingDisabledFlag = 0
def command(self):
return 101
@dataclasses.dataclass
class Servo(Message):
servos: uint16list = None
def command(self):
return 103
@dataclasses.dataclass
class Attitude(Message):
roll: decidegrees = 0
pitch: decidegrees = 0
yaw: uint16 = 0
def command(self):
return 108
def decoder():
while True:
while (yield None) != ord('$'):
print('sync')
continue
while True:
ch = yield None
if ch != ord('M'):
print('not-m', ch)
break
if (yield None) != ord('>'):
print('not->')
break
size = yield None
command = yield None
payload = b''
chk = command ^ size
for ch in range(length):
payload += ch
ch ^= ch
got = yield None
if got != ch:
break
if (yield (command, payload)) != '$':
break
class Link:
def __init__(self, port):
self.port = port
self.buf = None
def request(self, msg):
self.send(msg.command())
def send(self, command, data=None):
if data is None:
data = b''
chk = len(data) ^ command
for ch in data:
chk ^= ch
self.write(b'$M<' + struct.pack('<BBB', len(data), command, chk))
def write(self, b):
self.port.write(b)
def get(self) -> int:
if not self.buf:
self.buf = self.port.read()
if not self.buf:
raise Exception('Read timeout')
ch, self.buf = self.buf[0], self.buf[1:]
return ch
def read(self):
while True:
if self.get() != ord('$'):
print('sync')
continue
if self.get() != ord('M'):
print('not-m')
continue
if self.get() != ord('>'):
print('not->')
continue
size = self.get()
command = self.get()
payload = []
for _ in range(size):
payload.append(self.get())
chk = command ^ size
for p in payload:
chk ^= p
if self.get() != chk:
print('chk')
continue
return command, payload
def main():
p = serial.Serial('/dev/ttyUSB0', 115200, timeout=1)
l = Link(p)
while True:
# for t in Status, Servo:
for t in Attitude, :
l.request(t())
command, payload = l.read()
print(t().decode(payload))
main()
Loading…
Cancel
Save