#!/usr/bin/env python3 """Revo adapts from high level demands into calibrated demands for the rover. """ import time import enum import math import queue import rospy import std_msgs from std_msgs.msg import Float64, Bool from sensor_msgs.msg import Imu from geometry_msgs.msg import TwistWithCovarianceStamped from rosflight_msgs.msg import Command, RCRaw, Status import controller import limiter class RCChannel(enum.IntEnum): roll = 0 pitch = 1 throttle = 2 yaw = 3 dial = 5 mode = 6 def _scale_throttle(x: float, scale: tuple) -> float: """Scale a raw -1 to 1 throttle input. >>> round(_scale_throttle(-0.4, (0, 0.3, 1)), 5) 0.0 >>> round(_scale_throttle(-0.5, (0, 0.3, 1)), 5) -0.16667 >>> round(_scale_throttle(-0.3, (0, 0.3, 1)), 5) 0.07143 >>> round(_scale_throttle(0.3, (0, 0.3, 1)), 5) 0.5 """ low, zero, high = scale x = (x + 1) / 2 if x < 0: return low if x > 1: return high if x < zero: return (zero - x) / (low - zero) return (x - zero) / (high - zero) def _scale(x: float, scale: tuple) -> float: """Scale a -1 to 1 number to the given (low, zero, high) range. >>> _scale(-0.5, (-3, 0, 4)) -1.5 >>> _scale(-0.5, (0.1, 0.3, 0.9)) 0.2 >>> round(_scale(0.5, (0.1, 0.3, 0.9)), 5) 0.6 >>> round(_scale(0.8, (-0.8, 0.1, 0.9)), 5) 0.74 """ low, zero, high = scale if x <= -1: return low if x >= 1: return high if x < 0: return zero + (zero - low) * x return zero + (high - zero) * x def _scale_rc(x: float) -> float: """Scale a RC time in us to [-1..1]. >>> round(_scale_rc(1500), 5) 0.0 >>> round(_scale_rc(1100), 5) -0.8 >>> round(_scale_rc(1800), 5) 0.6 """ if x <= 1000: return -1 if x >= 2000: return 1 return -1 + (x - 1000) / 500 class Controller: def __init__(self): rospy.init_node('revo', anonymous=True) self._command_pub = rospy.Publisher('command', Command, queue_size=10) self._mode = rospy.Publisher('mode', Float64, queue_size=10) self._enable_z = rospy.Publisher('enable_z', Bool, queue_size=10) self._enable_f = rospy.Publisher('enable_f', Bool, queue_size=10) self._wz = rospy.Publisher('wz', Float64, queue_size=10) self._wz_sp = rospy.Publisher('wz_sp', Float64, queue_size=10) self._vx = rospy.Publisher('vx', Float64, queue_size=10) self._vx_sp = rospy.Publisher('vx_sp', Float64, queue_size=10) self._rc_rate = limiter.Rate(0.02) self._cmd = [0] * 4 self._scales = [ (-0.9, 0, 0.9), (-0.9, 0, 0.9), (-0.9, 0, 0.9), (0.1, 0.3, 0.9), ] self._throttle_scale = (0.0, 0.3, 1.0) max_accel = [ None, None, None, 1, ] self._accel_limiter = [limiter.SlewLimiter(x) for x in max_accel] self._rc_trim = [0.166, 0, 0, 0, 0, 0, 0, 0] self._rc = [0] * 8 self._armed = False self._wz_pid = controller.PID( kp=0.2, ki=0.01, windup_limit=1, lower_limit=-1, upper_limit=1) self._vx_pid = controller.PID( kp=0.1, ki=0.01, windup_limit=1, lower_limit=0, upper_limit=0.7) def rc_raw(self, msg): rc = [_scale_rc(x) for x in msg.values] rc[RCChannel.throttle] = _scale_throttle(rc[RCChannel.throttle], self._throttle_scale) rc = [x + trim for x, trim in zip(rc, self._rc_trim)] rc = [round(x, 5) for x in rc] sp = round((rc[RCChannel.dial] + 1) / 2 * 8) self._vx_sp.publish(sp) self._vx.sp = sp self._rc = rc def status(self, msg): self._armed = msg.armed def imu_data(self, msg): wz = msg.angular_velocity.z t = msg.header.stamp.to_sec() self._wz.publish(wz) u = self._wz_pid.step(t, wz) self._cmd[0] = u self._cmd[1] = u self._cmd[2] = u if self._rc_rate.step(t): self.send_command() def fix_velocity(self, msg): vx = math.sqrt(msg.twist.twist.linear.x**2 + msg.twist.twist.linear.y**2) t = msg.header.stamp.to_sec() self._vx.publish(vx) u = self._vx_pid.step(t, vx) self._cmd[3] = u def send_command(self): h = std_msgs.msg.Header() h.stamp = rospy.Time.now() cmd = [_scale(x, s) for x, s in zip(self._cmd, self._scales)] cmd = [f(x) for x, f in zip(cmd, self._accel_limiter)] self._command_pub.publish(Command(h, 0, 0, *cmd)) def send_enable(self): z = abs(self._rc[RCChannel.roll]) < 0.1 self._enable_z.publish(z) self._wz_pid.enable = z f = (abs(self._rc[RCChannel.throttle]) < 0.1) and self._armed self._enable_f.publish(f) self._vx_pid.enable = f def tick(self, msg): self.send_enable() def send_mode(self): switch = self._rc[RCChannel.mode] if switch < -0.3: mode = 0 elif switch < 0.3: mode = 1 else: mode = 2 self._mode.publish(mode) def run(self): q = queue.Queue() # rospy.Subscriber('cmd_z', Float64, lambda x: q.put((self.cmd_z, x))) # rospy.Subscriber('cmd_f', Float64, lambda x: q.put((self.cmd_f, x))) rospy.Subscriber('rc_raw', RCRaw, lambda x: q.put((self.rc_raw, x))) rospy.Subscriber('status', Status, lambda x: q.put((self.status, x))) rospy.Subscriber('imu/data', Imu, lambda x: q.put((self.imu_data, x))) rospy.Subscriber('ublox_gps/fix_velocity', TwistWithCovarianceStamped, lambda x: q.put((self.fix_velocity, x))) rospy.Timer(rospy.Duration(0.1), lambda x: q.put((self.tick, x))) while not rospy.is_shutdown(): try: f, msg = q.get(timeout=0.5) f(msg) except queue.Empty: pass def main(): c = Controller() c.run() if __name__ == '__main__': main()