msp: can send RC, enter armed mode

master
Michael Hope 4 years ago
parent ea18ed0913
commit 629f732d0c
  1. 461
      nppilot/mspdef.py
  2. 90
      nppilot/msplink.py

@ -1,80 +1,113 @@
import dataclasses
import collections.abc
import enum
import struct
from typing import List
import serial
import dataclasses
class PrimitiveMixin:
def decode(cls, f, p: bytes) -> (bytes, object):
fmt = '<' + cls.size()
size = struct.calcsize(fmt)
vs = struct.unpack(fmt, p[:size])
p, v = p[size:], vs[0]
v = cls.convert(cls, f, v)
return p, v
def convert(cls, f, v):
if f is not None:
return f.type(v)
return v
class Primitive(PrimitiveMixin, int):
pass
class uint8(int):
class uint8(Primitive):
def size():
return 'B'
class uint16(int):
class int8(Primitive):
def size():
return 'b'
class uint16(Primitive):
def size():
return 'H'
class int16(int):
class int16(Primitive):
def size():
return 'h'
class decidegrees(float):
class uint32(Primitive):
def size():
return 'I'
class decidegrees(Primitive):
def size():
return 'h'
def scale(v):
def convert(cls, f, v):
return v * 0.1
class uint16list(list):
def consume(p) -> (bytes, list):
def _decode(into, p):
for f in dataclasses.fields(into):
try:
p, v = f.type.decode(f.type, f, p)
setattr(into, f.name, v)
except Exception as ex:
print('ignoring error on', f.name, ex)
return p, into
def decode(into, payload):
p = bytes(payload)
p, v = _decode(into, p)
if len(p):
print('dropped remaining %d' % len(p))
return v
def encode(v, t=None):
out = b''
if dataclasses.is_dataclass(v):
for f in dataclasses.fields(v):
out += encode(getattr(v, f.name), f.type)
elif isinstance(v, collections.abc.Iterable):
for f in v:
out += encode(f, t.inner())
else:
return struct.pack('<' + t.size(), v)
return out
class List(list):
def decode(cls, f, p: bytes) -> (bytes, object):
inner = cls.inner()
out = []
while len(p) >= 2:
out.append(struct.unpack('<H', p[:2])[0])
p = p[2:]
while p:
if dataclasses.is_dataclass(inner):
p, v = _decode(inner(), p)
else:
p, v = inner.decode(inner, None, p)
out.append(v)
return p, out
class uint32(int):
def size():
return 'I'
class Message:
def encode(self) -> bytes:
values = []
fmt = '<'
for f in dataclasses.fields(self):
fmt += f.type.__dict__['size']()
values.append(getattr(self, f.name))
return struct.pack(fmt, *values)
def decode(self, payload):
p = bytes(payload)
for f in dataclasses.fields(self):
if 'consume' in f.type.__dict__:
p, v = f.type.__dict__['consume'](p)
setattr(self, f.name, v)
else:
fmt = '<' + f.type.__dict__['size']()
want = struct.calcsize(fmt)
v = struct.unpack(fmt, p[:want])[0]
if 'scale' in f.type.__dict__:
v = f.type.__dict__['scale'](v)
else:
v = f.type(v)
setattr(self, f.name, v)
p = p[want:]
if len(p):
print('dropped remaining %d' % len(p))
return self
class uint16list(List):
def inner():
return uint16
@dataclasses.dataclass
class Ident(Message):
class Ident:
version: uint8 = 0
multitype: uint8 = 0
msp_version: uint8 = 0
@ -84,7 +117,7 @@ class Ident(Message):
return 100
class ArmingDisabledFlag(enum.IntFlag):
class ArmingDisabledFlag(PrimitiveMixin, enum.IntFlag):
NO_GYRO = 1 << 0
FAILSAFE = 1 << 1
RX_FAILSAFE = 1 << 2
@ -110,7 +143,7 @@ class ArmingDisabledFlag(enum.IntFlag):
return 'I'
class SensorsFlag(enum.IntFlag):
class SensorsFlag(PrimitiveMixin, enum.IntFlag):
ACC = 1 << 0
BARO = 1 << 1
MAG = 1 << 2
@ -122,25 +155,56 @@ class SensorsFlag(enum.IntFlag):
return 'H'
class FlightModeFlag(enum.IntFlag):
NONE = 0
ANGLE = 1 << 0
HORIZON = 1 << 1
MAG = 1 << 2
BARO = 1 << 3
GPS_HOME = 1 << 4
GPS_HOLD = 1 << 5
HEADFREE = 1 << 6
class FlightModeFlag(PrimitiveMixin, enum.IntFlag):
ARM = 1 << 0
ANGLE = 1 << 1
HORIZON = 1 << 2
MAG = 1 << 3
BARO = 1 << 4
GPSHOME = 1 << 5
GPSHOLD = 1 << 6
HEADFREE = 1 << 7
PASSTHRU = 1 << 8
FAILSAFE = 1 << 10
GPS_RESCUE = 1 << 11
FAILSAFE = 1 << 9
GPSRESCUE = 1 << 10
ANTIGRAVITY = 1 << 11
HEADADJ = 1 << 12
CAMSTAB = 1 << 13
BEEPERON = 1 << 14
LEDLOW = 1 << 15
CALIB = 1 << 16
OSD = 1 << 17
TELEMETRY = 1 << 18
SERVO1 = 1 << 19
SERVO2 = 1 << 20
SERVO3 = 1 << 21
BLACKBOX = 1 << 22
AIRMODE = 1 << 23
X3D = 1 << 24
FPVANGLEMIX = 1 << 25
BLACKBOXERASE = 1 << 26
CAMERA1 = 1 << 27
CAMERA2 = 1 << 28
CAMERA3 = 1 << 29
FLIPOVERAFTERCRASH = 1 << 30
PREARM = 1 << 31
BEEPGPSCOUNT = 1 << 32
VTXPITMODE = 1 << 33
PARALYZE = 1 << 34
USER1 = 1 << 35
USER2 = 1 << 36
USER3 = 1 << 37
USER4 = 1 << 38
PIDAUDIO = 1 << 39
ACROTRAINER = 1 << 40
def size():
return 'I'
@dataclasses.dataclass
class Status(Message):
class Status:
cycle_time: uint16 = 0
i2c_errors_count: uint16 = 0
sensors: SensorsFlag = 0
@ -158,7 +222,7 @@ class Status(Message):
@dataclasses.dataclass
class Servo(Message):
class Servo:
servos: uint16list = None
def command(self):
@ -166,7 +230,7 @@ class Servo(Message):
@dataclasses.dataclass
class Attitude(Message):
class Attitude:
roll: decidegrees = 0
pitch: decidegrees = 0
yaw: uint16 = 0
@ -175,94 +239,183 @@ class Attitude(Message):
return 108
def decoder():
while True:
while (yield None) != ord('$'):
print('sync')
continue
while True:
ch = yield None
if ch != ord('M'):
print('not-m', ch)
break
if (yield None) != ord('>'):
print('not->')
break
size = yield None
command = yield None
payload = b''
chk = command ^ size
for ch in range(length):
payload += ch
ch ^= ch
got = yield None
if got != ch:
break
if (yield (command, payload)) != '$':
break
class Link:
def __init__(self, port):
self.port = port
self.buf = None
def request(self, msg):
self.send(msg.command())
def send(self, command, data=None):
if data is None:
data = b''
chk = len(data) ^ command
for ch in data:
chk ^= ch
self.write(b'$M<' + struct.pack('<BBB', len(data), command, chk))
def write(self, b):
self.port.write(b)
def get(self) -> int:
if not self.buf:
self.buf = self.port.read()
if not self.buf:
raise Exception('Read timeout')
ch, self.buf = self.buf[0], self.buf[1:]
return ch
def read(self):
while True:
if self.get() != ord('$'):
print('sync')
continue
if self.get() != ord('M'):
print('not-m')
continue
if self.get() != ord('>'):
print('not->')
continue
size = self.get()
command = self.get()
payload = []
for _ in range(size):
payload.append(self.get())
chk = command ^ size
for p in payload:
chk ^= p
if self.get() != chk:
print('chk')
continue
return command, payload
def main():
p = serial.Serial('/dev/ttyUSB0', 115200, timeout=1)
l = Link(p)
while True:
# for t in Status, Servo:
for t in Attitude, :
l.request(t())
command, payload = l.read()
print(t().decode(payload))
main()
@dataclasses.dataclass
class RC:
rc_data: uint16list = None
def command(self):
return 105
class BatteryStateEnum(PrimitiveMixin, enum.IntEnum):
OK = 0
WARNING = 1
CRITICAL = 2
NOT_PRESENT = 3
INIT = 4
def size():
return 'B'
@dataclasses.dataclass
class BatteryState:
cell_count: uint8 = 0
capacity: uint16 = 0 # mAh
voltage: uint8 = 0 # dV
mah_drawn: uint16 = 0
amps: int16 = 0 # cA
state: BatteryStateEnum = 0
def command(self):
return 130
@dataclasses.dataclass
class SetRawRC:
channel: uint16list = None
def command(self):
return 200
@dataclasses.dataclass
class SetServoConfiguration:
idx: uint8 = 0
min: uint16 = 0
max: uint16 = 0
middle: uint16 = 0
rate: uint8 = 0
forward_from_channel: uint8 = 0
reversed_sources: uint32 = 0
def command(self):
return 200
class InputSource(PrimitiveMixin, enum.IntEnum):
STABILIZED_ROLL = 0
STABILIZED_PITCH = 1
STABILIZED_YAW = 2
STABILIZED_THROTTLE = 3
RC_ROLL = 4
RC_PITCH = 5
RC_YAW = 6
RC_THROTTLE = 7
RC_AUX1 = 8
RC_AUX2 = 9
RC_AUX3 = 10
RC_AUX4 = 11
GIMBAL_PITCH = 12
GIMBAL_ROLL = 13
SOURCE_COUNT = 14
def size(self):
return 'B'
class TargetChannel(PrimitiveMixin, enum.IntEnum):
GIMBAL_PITCH = 0
GIMBAL_ROLL = 1
FLAPS = 2
FLAPPERON_1 = 3
FLAPPERON_2 = 4
RUDDER = 5
ELEVATOR = 6
THROTTLE = 7
BICOPTER_LEFT = 4
BICOPTER_RIGHT = 5
DUALCOPTER_LEFT = 4
DUALCOPTER_RIGHT = 5
SINGLECOPTER_1 = 3
SINGLECOPTER_2 = 4
SINGLECOPTER_3 = 5
SINGLECOPTER_4 = 6
HELI_LEFT = 0
HELI_RIGHT = 1
HELI_TOP = 2
HELI_RUD = 3
def size(self):
return 'B'
@dataclasses.dataclass
class SetServoMixRule:
idx: uint8 = 0
target_channel: TargetChannel = 0
input_source: InputSource = 0
rate: int8 = 0
speed: uint8 = 0
min: int8 = 0
max: int8 = 0
box: uint8 = 0
@dataclasses.dataclass
class ServoConfiguration:
min: int16 = 0
max: int16 = 0
middle: int16 = 0
rate: int8 = 0
forward_from_channel: int8 = 0
revered_sources: uint32 = 0
class ServoConfigurationList(List):
def inner():
return ServoConfiguration
@dataclasses.dataclass
class ServoConfigurations:
configs: ServoConfigurationList = None
def command(self):
return 120
@dataclasses.dataclass
class SetArmingDisabled:
command: uint8 = 0
disable_runway_takeoff: uint8 = 0
def command(self):
return 99
@dataclasses.dataclass
class SetModeRange:
idx: uint8 = 0
box_id: uint8 = 0
aux_channel_idx: uint8 = 0
start_step: uint8 = 0
end_step: uint8 = 0
def command(self):
return 35
@dataclasses.dataclass
class ModeRange:
idx: uint8 = 0
aux_channel_idx: uint8 = 0
start_step: uint8 = 0
end_step: uint8 = 0
class ModeRangeList(List):
def inner():
return ModeRange
@dataclasses.dataclass
class ModeRanges:
ranges: ModeRangeList = None
def command(self):
return 35

@ -0,0 +1,90 @@
import struct
import time
import serial
import mspdef
class Link:
def __init__(self, port):
self.port = port
self.buf = None
def request(self, msg):
self.send(msg.command())
def push(self, msg):
self.send(msg.command(), mspdef.encode(msg))
def send(self, command, data=None):
data = data or b''
chk = len(data) ^ command
for ch in data:
chk ^= ch
self.write(b'$M<' + struct.pack('<BB', len(data), command) +
bytes(data) + bytes((chk, )))
def write(self, b):
self.port.write(b)
def get(self) -> int:
if not self.buf:
w = self.port.in_waiting or 1
self.buf = self.port.read(w)
print('< %r' % self.buf)
if not self.buf:
raise Exception('Read timeout')
ch, self.buf = self.buf[0], self.buf[1:]
return ch
def read(self):
while True:
if self.get() != ord('$'):
print('sync')
continue
if self.get() != ord('M'):
print('not-m')
continue
if self.get() != ord('>'):
print('not->')
continue
size = self.get()
command = self.get()
payload = []
for _ in range(size):
payload.append(self.get())
chk = command ^ size
for p in payload:
chk ^= p
if self.get() != chk:
print('chk')
continue
return command, payload
def main():
p = serial.Serial('/dev/ttyUSB0', 115200, timeout=1)
l = Link(p)
aux1 = 1000
ch1 = 1000
# print('\033[;H\033[J')
while True:
ch1 += 999
if ch1 >= 2000:
ch1 = 1000
# print('\033[;H')
for t in mspdef.Status, :
l.request(t())
command, payload = l.read()
s = mspdef.decode(t(), payload)
print(s)
if s.arming_disable_flags == 0:
aux1 = 1500
l.push(mspdef.SetRawRC([ch1, ch1, 1000, ch1, aux1]))
command, payload = l.read()
print(mspdef.decode(mspdef.SetRawRC(), payload))
time.sleep(0.1)
main()
Loading…
Cancel
Save