nppilot-v2/src/nppilot/scripts/revo.py

221 lines
6 KiB
Python
Executable file

#!/usr/bin/env python3
"""Revo adapts from high level demands into calibrated demands for the
rover.
"""
import time
import enum
import math
import queue
import rospy
import std_msgs
from std_msgs.msg import Float64, Bool
from sensor_msgs.msg import Imu
from geometry_msgs.msg import TwistWithCovarianceStamped
from rosflight_msgs.msg import Command, RCRaw, Status
import controller
import limiter
class RCChannel(enum.IntEnum):
roll = 0
pitch = 1
throttle = 2
yaw = 3
dial = 5
mode = 6
def _scale_throttle(x: float, scale: tuple) -> float:
"""Scale a raw -1 to 1 throttle input.
>>> round(_scale_throttle(-0.4, (0, 0.3, 1)), 5)
0.0
>>> round(_scale_throttle(-0.5, (0, 0.3, 1)), 5)
-0.16667
>>> round(_scale_throttle(-0.3, (0, 0.3, 1)), 5)
0.07143
>>> round(_scale_throttle(0.3, (0, 0.3, 1)), 5)
0.5
"""
low, zero, high = scale
x = (x + 1) / 2
if x < 0:
return low
if x > 1:
return high
if x < zero:
return (zero - x) / (low - zero)
return (x - zero) / (high - zero)
def _scale(x: float, scale: tuple) -> float:
"""Scale a -1 to 1 number to the given (low, zero, high) range.
>>> _scale(-0.5, (-3, 0, 4))
-1.5
>>> _scale(-0.5, (0.1, 0.3, 0.9))
0.2
>>> round(_scale(0.5, (0.1, 0.3, 0.9)), 5)
0.6
>>> round(_scale(0.8, (-0.8, 0.1, 0.9)), 5)
0.74
"""
low, zero, high = scale
if x <= -1:
return low
if x >= 1:
return high
if x < 0:
return zero + (zero - low) * x
return zero + (high - zero) * x
def _scale_rc(x: float) -> float:
"""Scale a RC time in us to [-1..1].
>>> round(_scale_rc(1500), 5)
0.0
>>> round(_scale_rc(1100), 5)
-0.8
>>> round(_scale_rc(1800), 5)
0.6
"""
if x <= 1000:
return -1
if x >= 2000:
return 1
return -1 + (x - 1000) / 500
class Controller:
def __init__(self):
rospy.init_node('revo', anonymous=True)
self._command_pub = rospy.Publisher('command', Command, queue_size=10)
self._mode = rospy.Publisher('mode', Float64, queue_size=10)
self._enable_z = rospy.Publisher('enable_z', Bool, queue_size=10)
self._enable_f = rospy.Publisher('enable_f', Bool, queue_size=10)
self._wz = rospy.Publisher('wz', Float64, queue_size=10)
self._wz_sp = rospy.Publisher('wz_sp', Float64, queue_size=10)
self._vx = rospy.Publisher('vx', Float64, queue_size=10)
self._vx_sp = rospy.Publisher('vx_sp', Float64, queue_size=10)
self._rc_rate = limiter.Rate(0.02)
self._cmd = [0] * 4
self._scales = [
(-0.9, 0, 0.9),
(-0.9, 0, 0.9),
(-0.9, 0, 0.9),
(0.1, 0.3, 0.9),
]
self._throttle_scale = (0.0, 0.3, 1.0)
max_accel = [
None,
None,
None,
1,
]
self._accel_limiter = [limiter.SlewLimiter(x) for x in max_accel]
self._rc_trim = [0.166, 0, 0, 0, 0, 0, 0, 0]
self._rc = [0] * 8
self._armed = False
self._wz_pid = controller.PID(
kp=0.2, ki=0.01, windup_limit=1, lower_limit=-1, upper_limit=1)
self._vx_pid = controller.PID(
kp=0.1, ki=0.01, windup_limit=1, lower_limit=0, upper_limit=0.7)
def rc_raw(self, msg):
rc = [_scale_rc(x) for x in msg.values]
rc[RCChannel.throttle] = _scale_throttle(rc[RCChannel.throttle], self._throttle_scale)
rc = [x + trim for x, trim in zip(rc, self._rc_trim)]
rc = [round(x, 5) for x in rc]
sp = round((rc[RCChannel.dial] + 1) / 2 * 8)
self._vx_sp.publish(sp)
self._vx.sp = sp
self._rc = rc
def status(self, msg):
self._armed = msg.armed
def imu_data(self, msg):
wz = msg.angular_velocity.z
t = msg.header.stamp.to_sec()
self._wz.publish(wz)
u = self._wz_pid.step(t, wz)
self._cmd[0] = u
self._cmd[1] = u
self._cmd[2] = u
if self._rc_rate.step(t):
self.send_command()
def fix_velocity(self, msg):
vx = math.sqrt(msg.twist.twist.linear.x**2 + msg.twist.twist.linear.y**2)
t = msg.header.stamp.to_sec()
self._vx.publish(vx)
u = self._vx_pid.step(t, vx)
self._cmd[3] = u
def send_command(self):
h = std_msgs.msg.Header()
h.stamp = rospy.Time.now()
cmd = [_scale(x, s) for x, s in zip(self._cmd, self._scales)]
cmd = [f(x) for x, f in zip(cmd, self._accel_limiter)]
self._command_pub.publish(Command(h, 0, 0, *cmd))
def send_enable(self):
z = abs(self._rc[RCChannel.roll]) < 0.1
self._enable_z.publish(z)
self._wz_pid.enable = z
f = (abs(self._rc[RCChannel.throttle]) < 0.1) and self._armed
self._enable_f.publish(f)
self._vx_pid.enable = f
def tick(self, msg):
self.send_enable()
def send_mode(self):
switch = self._rc[RCChannel.mode]
if switch < -0.3:
mode = 0
elif switch < 0.3:
mode = 1
else:
mode = 2
self._mode.publish(mode)
def run(self):
q = queue.Queue()
# rospy.Subscriber('cmd_z', Float64, lambda x: q.put((self.cmd_z, x)))
# rospy.Subscriber('cmd_f', Float64, lambda x: q.put((self.cmd_f, x)))
rospy.Subscriber('rc_raw', RCRaw, lambda x: q.put((self.rc_raw, x)))
rospy.Subscriber('status', Status, lambda x: q.put((self.status, x)))
rospy.Subscriber('imu/data', Imu, lambda x: q.put((self.imu_data, x)))
rospy.Subscriber('ublox_gps/fix_velocity', TwistWithCovarianceStamped,
lambda x: q.put((self.fix_velocity, x)))
rospy.Timer(rospy.Duration(0.1), lambda x: q.put((self.tick, x)))
while not rospy.is_shutdown():
try:
f, msg = q.get(timeout=0.5)
f(msg)
except queue.Empty:
pass
def main():
c = Controller()
c.run()
if __name__ == '__main__':
main()