-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_ekf
112 lines (98 loc) · 4.61 KB
/
run_ekf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from rclpy.qos import qos_profile_sensor_data
import tf_transformations
import numpy as np
from localization_project.ekf import RobotEKF
from localization_project.motion_models import velocity_motion_model, get_odometry_input, odometry_motion_model
from localization_project.measurement_model import range_and_bearing, z_landmark, residual
class robot_localization_kalman_filter(Node):
def __init__(self):
super().__init__('robot_ekf')
self.ground_truth_sub = self.create_subscription(Odometry, '/ground_truth', self.ground_truth_callback, 10)
self.odom_sub = self.create_subscription(Odometry, '/diff_drive_controller/odom', self.odometry_callback, 10)
self.velocity_sub = self.create_subscription(Odometry, '/diff_drive_controller/odom', self.velocity_callback, 10)
self.ekf_pub = self.create_publisher(Odometry, '/ekf', 10)
self.ground_truth = np.array([0.0, 0.0, 0.0])
#self.ground_truth_sub.set_rate(10)
self.ekf_rate = self.create_timer(1.0, self.prediction_step_callback)
self.ekf_period_s = 1.0
self.initial_pose = [-2.0, 0.0, 0.0] #x,y,yaw
self.x = self.initial_pose[0]
self.y = self.initial_pose[1]
self.theta = self.initial_pose[2]
self.v = 0.01
self.w = 0.01
self.mu = np.zeros((3,1))
self.initial_covariance = 0.0001*np.eye(3) #initial covariance
self.lmark = [-1.1, -1.1, -1.1, 0.0, -1.1, 1.1,
0.0, -1.1, 0.0, 0.0, 0.0, 1.1,
1.1, -1.1, 1.1, 0.0, 1.1, 1.1] #landmarks list: even = y, odd = x
self.std_rot1 = 0.05
self.std_trasl = 0.05
self.std_rot2 = 0.05
self.std_lin_vel = 0.0001
self.std_ang_vel = 0.0001
self.std_rng = 0.3
self.std_brg = np.deg2rad(1.0)
self.max_range = 8.0
self.fov_deg = np.deg2rad(45.0)
self.z = np.array([0.0, 0.0]).T
eval_hx, eval_Ht = range_and_bearing()
eval_gux, eval_Gt, eval_Vt = velocity_motion_model()
eval_gux_odom, eval_Gt_odom, eval_Vt_odom = odometry_motion_model()
self.ekf = RobotEKF(dim_x=3, dim_z=1, dim_u=2,
eval_gux=eval_gux, eval_Gt=eval_Gt, eval_Vt=eval_Vt,
eval_hx=eval_hx, eval_Ht=eval_Ht)
def odometry_callback(self, msgs):
quat=[msgs.pose.pose.orientation.x,msgs.pose.pose.orientation.y,msgs.pose.pose.orientation.z,msgs.pose.pose.orientation.w]
_, _, self.theta = tf_transformations.euler_from_quaternion(quat)
self.x = msgs.pose.pose.position.x
self.y = msgs.pose.pose.position.y
print('x',)
print(self.x)
print('y')
print(self.y)
print('yaw')
print(self.theta)
def velocity_callback(self,msgs):
self.v = msgs.twist.twist.linear.x
self.w = msgs.twist.twist.angular.z
print('v')
print(self.v)
print('w')
print(self.w)
def prediction_step_callback(self):
self.ekf.predict(u =np.array([[self.v, self.w]]).T, g_extra_args=[self.ekf_rate.timer_period_ns*1e9])
def ground_truth_callback(self, msg):
quat = [msg.pose.pose.orientation.x, msg.pose.pose.orientation.y, msg.pose.pose.orientation.z, msg.pose.pose.orientation.w]
_, _, self.ground_truth[2] = tf_transformations.euler_from_quaternion(quat)
self.ground_truth[0] = msg.pose.pose.position.x
self.ground_truth[1] = msg.pose.pose.position.y
def update_step_callback(self):
#self.ekf.update(self.z, self.lmark, residual = np.subtract)
for i in range(0, len(self.lmark), 2):
lmark = [self.lmark[i], self.lmark[i + 1]]
z = z_landmark(np.array([self.ground_truth]).T, lmark, self.ekf.eval_hx,
self.std_rng, self.std_brg)
if z is not None:
self.z = z
# Perform update step
self.ekf.update(self.z, lmark, residual=np.subtract)
ekf_estimate = self.ekf.mu
ekf_msg = Odometry()
ekf_msg.pose.pose.position.x = ekf_estimate[0, 0]
ekf_msg.pose.pose.position.y = ekf_estimate[1, 0]
ekf_msg.pose.pose.orientation.z = np.sin(ekf_estimate[2, 0] / 2.0)
ekf_msg.pose.pose.orientation.w = np.cos(ekf_estimate[2, 0] / 2.0)
# Publish the result
print("Published EKF message:", ekf_msg) # Add this line
self.ekf_pub.publish(ekf_msg)
def main(args=None):
rclpy.init(args=args)
kalman_filter=robot_localization_kalman_filter()
rclpy.spin(kalman_filter)
rclpy.shutdown()
if __name__=='__main__':
main()