parent
aee51e3f8e
commit
e06560584a
@ -0,0 +1,137 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Revo adapts from high level demands into calibrated demands for the
|
||||
rover.
|
||||
"""
|
||||
|
||||
import utm
|
||||
import time
|
||||
import enum
|
||||
import math
|
||||
import queue
|
||||
|
||||
import numpy as np
|
||||
|
||||
import rospy
|
||||
|
||||
import std_msgs
|
||||
from std_msgs.msg import Float64, Bool
|
||||
from sensor_msgs.msg import Imu, NavSatFix
|
||||
from geometry_msgs.msg import TwistWithCovarianceStamped, PoseWithCovariance, TwistWithCovariance, Pose, Point, Quaternion, Twist, Vector3
|
||||
from rosflight_msgs.msg import Command, RCRaw, Status
|
||||
from nav_msgs.msg import Odometry
|
||||
|
||||
|
||||
class Filter:
|
||||
def __init__(self, tau):
|
||||
self._acc = 0
|
||||
self._tau = tau
|
||||
self._t = None
|
||||
|
||||
def step(self, t, v):
|
||||
if self._t is None:
|
||||
self._acc = v
|
||||
else:
|
||||
c = self._tau * (t - self._t).to_sec()
|
||||
self._acc = self._acc * (1 - c) + v * c
|
||||
self._t = t
|
||||
return self._acc
|
||||
|
||||
|
||||
def run(subscribe):
|
||||
q = queue.Queue()
|
||||
subscribe(q)
|
||||
|
||||
while not rospy.is_shutdown():
|
||||
try:
|
||||
f, msg = q.get(timeout=0.2)
|
||||
f(msg)
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
|
||||
BASE = (47.35, 8.5)
|
||||
|
||||
|
||||
class Revo:
|
||||
def __init__(self):
|
||||
rospy.init_node('revosim', anonymous=True)
|
||||
|
||||
self._odo_pub = rospy.Publisher('odometry', Odometry, queue_size=10)
|
||||
|
||||
self.pos = np.zeros(2)
|
||||
self.v = Filter(3)
|
||||
self.theta = 0
|
||||
self.w = 0
|
||||
self.w_filt = Filter(1)
|
||||
self.dt = 0.05
|
||||
self.gps_dt = 0.1
|
||||
self.vodom = (0, 0)
|
||||
|
||||
self.v_sp = 1
|
||||
self.w_sp = 0.2
|
||||
self.gps_base = utm.from_latlon(*BASE)
|
||||
|
||||
def subscribe(self, q):
|
||||
rospy.Timer(rospy.Duration(self.dt), lambda x: q.put((self.step, x)))
|
||||
rospy.Timer(rospy.Duration(self.gps_dt), lambda x: q.put((self.step_gps, x)))
|
||||
|
||||
def step_gps(self, msg):
|
||||
now = rospy.Time.now()
|
||||
ll = utm.to_latlon(self.gps_base[0] + self.pos[0], self.gps_base[1] + self.pos[1],
|
||||
self.gps_base[2], self.gps_base[3])
|
||||
self.fix_pub.publish(NavSatFix(latitude=ll[0], longitude=ll[1]))
|
||||
h = std_msgs.msg.Header(stamp=now, frame_id='map')
|
||||
self.fix_velocity_pub.publish(
|
||||
TwistWithCovarianceStamped(
|
||||
header=h,
|
||||
twist=TwistWithCovariance(twist=Twist(linear=Vector3(x=self.vo[0], y=self.vo[1])))))
|
||||
|
||||
def step(self, msg):
|
||||
now = rospy.Time.now()
|
||||
|
||||
vb = self.v.step(now, self.v_sp)
|
||||
self.vo = np.array((math.cos(self.theta), math.sin(self.theta))) * vb
|
||||
self.pos += self.vo * self.dt
|
||||
|
||||
self.w = self.w_filt.step(now, self.w_sp)
|
||||
self.theta += self.w * self.dt
|
||||
if self.theta >= math.pi:
|
||||
self.theta -= 2 * math.pi
|
||||
if self.theta <= -math.pi:
|
||||
self.theta += 2 * math.pi
|
||||
|
||||
h = std_msgs.msg.Header(stamp=now, frame_id='map')
|
||||
self.imu_pub.publish(
|
||||
header=h,
|
||||
orientation=Quaternion(z=math.sin(self.theta / 2), w=math.cos(self.theta / 2)),
|
||||
angular_velocity=Vector3(z=self.w),
|
||||
)
|
||||
|
||||
|
||||
# q = [0, 0, math.sin(a / 2), math.cos(a / 2)]
|
||||
|
||||
# a = self.theta
|
||||
# pose = PoseWithCovariance(
|
||||
# pose=Pose(
|
||||
# position=Point(self.pos[0], self.pos[1], 0),
|
||||
# orientation=Quaternion(*q),
|
||||
# ))
|
||||
# twist = TwistWithCovariance(
|
||||
# twist=Twist(linear=Vector3(x=self.vo[0], y=self.vo[1]), angular=Vector3(z=self.w)))
|
||||
# self._odo_pub.publish(Odometry(h, '', pose, twist))
|
||||
|
||||
def run(self):
|
||||
self.fix_pub = rospy.Publisher('ublox_gps/fix', NavSatFix, queue_size=10)
|
||||
self.fix_velocity_pub = rospy.Publisher(
|
||||
'ublox_gps/fix_velocity', TwistWithCovarianceStamped, queue_size=10)
|
||||
self.imu_pub = rospy.Publisher('imu/data', Imu, queue_size=10)
|
||||
run(self.subscribe)
|
||||
|
||||
|
||||
def main():
|
||||
c = Revo()
|
||||
c.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Reference in new issue