nppilot: added a PID controller, limiter, and wired in.

master
Michael Hope 4 years ago
parent 804e9b4a6a
commit 124782ace5

@ -0,0 +1,85 @@
from typing import Optional, Tuple
def interp(v: float, table: Optional[Tuple]) -> float:
if table is None:
return 0
if v < table[0][0]:
return table[0][1]
if v > table[-1][0]:
return table[-1][1]
for i in range(1, len(table)):
if v <= table[i][0]:
x0, y0 = table[i - 1]
x1, y1 = table[i]
x = (v - x0) / (x1 - x0)
return y0 + (y1 - y0) * x
return table[-1][1]
class PID:
def __init__(self,
kp: float = 0,
ki: float = 0,
kd: float = 0,
ff: tuple = None,
deadband: Optional[tuple] = None,
windup_limit: Optional[float] = None,
lower_limit: float = None,
upper_limit: float = None):
self._kp = kp
self._ki = ki
self._kd = kd
self._windup_limit = windup_limit
self._ff = ff
self._deadband = deadband
self._lower_limit = lower_limit if lower_limit is not None else -1e12
self._upper_limit = upper_limit if upper_limit is not None else 1e12
self.enabled = True
self._sp = 0.
self._ti = 0.
self._errs = [0.] * 3
self._t: Optional[float] = None
def step(self, t: float, pv: float) -> float:
err = self._sp - pv
td = 0
if self._t is not None:
dt = t - self._t
self._ti += err * dt
if self._windup_limit is not None:
self._ti = max(-self._windup_limit, min(self._windup_limit, self._ti))
self._t = t
if not self.enabled:
self._ti = 0
u = self._kp * err + self._ki * self._ti + self._kd * td + interp(pv, self._ff)
if self._deadband is not None:
if u >= self._deadband[0]:
u += self._deadband[1]
elif u <= -self._deadband[0]:
u -= self._deadband[1]
u = min(self._upper_limit, max(self._lower_limit, u))
return u
class LPF:
def __init__(self):
self.c_ = 0
self.vs_ = (0., 0., 0.)
self.fs_ = (0., 0., 0.)
def step(self, v: float) -> float:
c = self.c_
vs = (v, self.vs_[0], self.vs_[1])
fs = self.fs_
f = (1 / (1 + c * c + 1.414 * c)) * (vs[2] + 2 * vs[1] + vs[0] -
(c * c - 1.414 * c + 1) * fs[1] -
(-2 * c * c + 2) * fs[0])
self.vs_ = vs
self.fs_ = (f, fs[0], fs[1])
return f

@ -0,0 +1,41 @@
import time
class Rate:
def __init__(self, cap):
self._cap = cap
self._t = None
def step(self, t):
if self._t is None:
self._t = t
return True
elapsed = t - self._t
if elapsed < self._cap:
return False
self._t += self._cap
return True
class SlewLimiter:
def __init__(self, rate: float, now=time.time):
self._rate = rate
self._a = None
self._t = None
self._now = now
def __call__(self, v: float) -> float:
if self._rate is None:
return v
now = self._now()
if self._t is None:
self._v = v
else:
limit = self._rate * (now - self._t)
delta = min(limit, max(-limit, v - self._v))
self._v += delta
self._t = now
return self._v

@ -1,23 +1,24 @@
#!/usr/bin/env python
#!/usr/bin/env python3
"""Revo adapts from high level demands into calibrated demands for the
rover.
"""
from __future__ import division, with_statement, print_function
import time
import types
import Queue as queue
import enum
import math
import queue
import rospy
import std_msgs
from std_msgs.msg import Float32, Float64, Bool
import std_msgs
from std_msgs.msg import Float64, Bool
from sensor_msgs.msg import Imu
from geometry_msgs.msg import Vector3Stamped
from geometry_msgs.msg import TwistWithCovarianceStamped
from rosflight_msgs.msg import Command, RCRaw, Status
import controller
import limiter
class RCChannel(enum.IntEnum):
roll = 0
@ -27,7 +28,7 @@ class RCChannel(enum.IntEnum):
mode = 6
def _scale_throttle(x, scale):
def _scale_throttle(x: float, scale: tuple) -> float:
"""Scale a raw -1 to 1 throttle input.
>>> round(_scale_throttle(-0.4, (0, 0.3, 1)), 5)
@ -50,7 +51,7 @@ def _scale_throttle(x, scale):
return (x - zero) / (high - zero)
def _scale(x, scale):
def _scale(x: float, scale: tuple) -> float:
"""Scale a -1 to 1 number to the given (low, zero, high) range.
>>> _scale(-0.5, (-3, 0, 4))
@ -72,7 +73,7 @@ def _scale(x, scale):
return zero + (high - zero) * x
def _scale_rc(x):
def _scale_rc(x: float) -> float:
"""Scale a RC time in us to [-1..1].
>>> round(_scale_rc(1500), 5)
@ -89,29 +90,6 @@ def _scale_rc(x):
return -1 + (x - 1000) / 500
class SlewLimiter:
def __init__(self, rate, now=time.time):
self._rate = rate
self._a = None
self._t = None
self._now = now
def __call__(self, v):
if self._rate == None:
return v
now = self._now()
if self._t == None:
self._v = v
else:
limit = self._rate * (now - self._t)
delta = min(limit, max(-limit, v - self._v))
self._v += delta
self._t = now
return self._v
class Controller:
def __init__(self):
rospy.init_node('revo', anonymous=True)
@ -121,6 +99,8 @@ class Controller:
self._enable_f = rospy.Publisher('enable_f', Bool, queue_size=10)
self._wz = rospy.Publisher('wz', Float64, queue_size=10)
self._wz_sp = rospy.Publisher('wz_sp', Float64, queue_size=10)
self._vx = rospy.Publisher('vx', Float64, queue_size=10)
self._rc_rate = limiter.Rate(0.02)
self._cmd = [0] * 4
self._scales = [
@ -136,23 +116,19 @@ class Controller:
None,
1,
]
self._accel_limiter = [SlewLimiter(x) for x in max_accel]
self._accel_limiter = [limiter.SlewLimiter(x) for x in max_accel]
self._rc_trim = [0.166, 0, 0, 0, 0, 0, 0, 0]
self._rc = [0] * 8
self._armed = False
def cmd_z(self, msg):
self._cmd[0] = msg.data
self._cmd[1] = msg.data
self._cmd[2] = msg.data
def cmd_f(self, msg):
self._cmd[3] = msg.data
self._wz_pid = controller.PID(
kp=0.2, ki=0.01, windup_limit=1, lower_limit=-1, upper_limit=1)
self._v_pid = controller.PID(
kp=0.1, ki=0.01, windup_limit=1, lower_limit=0, upper_limit=0.7)
def rc_raw(self, msg):
rc = [_scale_rc(x) for x in msg.values]
rc[RCChannel.throttle] = _scale_throttle(rc[RCChannel.throttle],
self._throttle_scale)
rc[RCChannel.throttle] = _scale_throttle(rc[RCChannel.throttle], self._throttle_scale)
rc = [x + trim for x, trim in zip(rc, self._rc_trim)]
rc = [round(x, 5) for x in rc]
self._rc = rc
@ -161,7 +137,24 @@ class Controller:
self._armed = msg.armed
def imu_data(self, msg):
self._wz.publish(msg.angular_velocity.z)
wz = msg.angular_velocity.z
t = msg.header.stamp.to_sec()
self._wz.publish(wz)
u = self._wz_pid.step(t, wz)
self._cmd[0] = u
self._cmd[1] = u
self._cmd[2] = u
if self._rc_rate.step(t):
self.send_command()
def fix_velocity(self, msg):
vx = math.sqrt(msg.twist.twist.linear.x**2 + msg.twist.twist.linear.y**2)
t = msg.header.stamp.to_sec()
self._vx.publish(vx)
u = self._v_pid.step(t, vx)
self._cmd[3] = u
def send_command(self):
h = std_msgs.msg.Header()
@ -171,9 +164,16 @@ class Controller:
self._command_pub.publish(Command(h, 0, 0, *cmd))
def send_enable(self):
self._enable_z.publish(abs(self._rc[RCChannel.roll]) < 0.1)
self._enable_f.publish((abs(self._rc[RCChannel.throttle]) < 0.1)
and self._armed)
z = abs(self._rc[RCChannel.roll]) < 0.1
self._enable_z.publish(z)
self._wz_pid.enable = z
f = (abs(self._rc[RCChannel.throttle]) < 0.1) and self._armed
self._enable_f.publish(f)
self._v_pid.enable = f
def tick(self, msg):
self.send_enable()
def send_mode(self):
switch = self._rc[RCChannel.mode]
@ -188,31 +188,22 @@ class Controller:
def run(self):
q = queue.Queue()
rospy.Subscriber('cmd_z', Float64, lambda x: q.put((self.cmd_z, x)))
rospy.Subscriber('cmd_f', Float64, lambda x: q.put((self.cmd_f, x)))
# rospy.Subscriber('cmd_z', Float64, lambda x: q.put((self.cmd_z, x)))
# rospy.Subscriber('cmd_f', Float64, lambda x: q.put((self.cmd_f, x)))
rospy.Subscriber('rc_raw', RCRaw, lambda x: q.put((self.rc_raw, x)))
rospy.Subscriber('status', Status, lambda x: q.put((self.status, x)))
rospy.Subscriber('imu/data', Imu, lambda x: q.put((self.imu_data, x)))
rospy.Subscriber('ublox_gps/fix_velocity', TwistWithCovarianceStamped,
lambda x: q.put((self.fix_velocity, x)))
rate = rospy.Rate(20)
cycle = 0
rospy.Timer(rospy.Duration(0.1), lambda x: q.put((self.tick, x)))
while not rospy.is_shutdown():
try:
while True:
f, msg = q.get_nowait()
f(msg)
f, msg = q.get(timeout=0.5)
f(msg)
except queue.Empty:
pass
self.send_command()
if cycle % 5 == 0:
self.send_enable()
self.send_mode()
self._wz_sp.publish(0)
cycle += 1
if cycle >= 100:
cycle = 0
rate.sleep()
def main():

@ -0,0 +1,67 @@
import controller
def test_interp():
table = (
(5, 10),
(6, 15),
(8, 20),
)
# Below gives minimum
assert controller.interp(4, table) == 10
# Above gives maximum
assert controller.interp(8.1, table) == 20
# Between is interpolated
assert controller.interp(5.5, table) == 12.5
assert controller.interp(7, table) == 17.5
def test_pid_kp():
p = controller.PID(kp=5)
# u = Kp * err
assert p.step(0, 10) == -50
def test_pid_ki():
p = controller.PID(ki=4, windup_limit=100)
p.step(0, 10)
# u = Ki * err over 0.5s
assert p.step(0.5, 10) == -20
assert p.step(0.7, 20) == -40 * 0.5 + -80 * 0.2
# Tops out at Ki * windup_limit
assert p.step(10, 10) == -400
assert p.step(30, -10) == 400
# Disabling clears the integral
p.enabled = False
assert p.step(31, -10) == 0
def test_pid_ff():
ff = (
(5, 10),
(10, 20),
(15, 40),
)
p = controller.PID(ff=ff)
# u = interp(ff, pv)
assert p.step(1, 12) == 28
def test_deadband():
p = controller.PID(kp=1, deadband=(0.5, 1.0))
# No deadband below limit
assert p.step(0, 0.4) == -0.4
assert p.step(0, -0.4) == 0.4
# Deadband above limit
assert p.step(0, 0.6) == -1.6
assert p.step(0, -0.6) == 1.6
def test_lpf():
f = controller.LPF()
for _ in range(20):
print(round(f.step(10), 3))
test_lpf()
Loading…
Cancel
Save