-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
186 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,3 @@ | ||
# This directory is for Python modules to be imported. | ||
# This directory is for Python modules to be imported. | ||
|
||
## [List of PX4's ROS 2 topics](https://github.com/PX4/PX4-Autopilot/blob/main/src/modules/uxrce_dds_client/dds_topics.yaml) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
import rclpy | ||
from rclpy.node import Node | ||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy | ||
from px4_msgs.msg import TrajectorySetpoint, VehicleAttitudeSetpoint, VehicleRatesSetpoint,\ | ||
OffboardControlMode, VehicleCommand, VehicleStatus, VehicleLocalPosition, VehicleOdometry | ||
|
||
|
||
class OffboardControlNode(Node): | ||
"""Node for controlling a vehicle in offboard mode.""" | ||
|
||
def __init__(self) -> None: | ||
super().__init__('offboard_control_node') | ||
|
||
# Configure QoS profile for publishing and subscribing | ||
qos_profile = QoSProfile( | ||
reliability=ReliabilityPolicy.BEST_EFFORT, | ||
durability=DurabilityPolicy.TRANSIENT_LOCAL, | ||
history=HistoryPolicy.KEEP_LAST, | ||
depth=1 | ||
) | ||
|
||
# Status publishers | ||
self.offboard_ctrl_mode_pub = self.create_publisher( | ||
OffboardControlMode, '/fmu/in/offboard_control_mode', qos_profile) | ||
self.command_pub = self.create_publisher( | ||
VehicleCommand, '/fmu/in/vehicle_command', qos_profile) | ||
|
||
# State publishers | ||
self.traj_setpt_pub = self.create_publisher( | ||
TrajectorySetpoint, '/fmu/in/trajectory_setpoint', qos_profile) | ||
self.att_setpt_pub = self.create_publisher( | ||
VehicleAttitudeSetpoint, 'fmu/in/vehicle_attitude_setpoint', qos_profile) | ||
self.rate_setpt_pub = self.create_publisher( | ||
VehicleRatesSetpoint, 'fmu/in/vehicle_rates_setpoint', qos_profile) | ||
|
||
# Subscribers | ||
self.status_sub = self.create_subscription( | ||
VehicleStatus, '/fmu/out/vehicle_status', self.status_cb, qos_profile) | ||
self.local_pos_sub = self.create_subscription( | ||
VehicleLocalPosition, '/fmu/out/vehicle_local_position', self.local_pos_cb, qos_profile) | ||
# state subscriber | ||
self.odom_sub = self.create_subscription( | ||
VehicleOdometry, '/fmu/out/vehicle_odometry', self.odom_cb, qos_profile) | ||
|
||
# Initialize variables | ||
self.setpt_count = 0 | ||
self.status = VehicleStatus() | ||
self.local_pos = VehicleLocalPosition() | ||
self.odom = VehicleOdometry() | ||
self.takeoff_height = -5.0 | ||
|
||
''' | ||
# Create a timer to publish control commands | ||
self.timer = self.create_timer(0.1, self.timer_cb) | ||
''' | ||
|
||
|
||
def status_cb(self, status): | ||
self.status = status | ||
|
||
|
||
def local_pos_cb(self, local_pos): | ||
self.local_pos = local_pos | ||
|
||
|
||
def odom_cb(self, odom): | ||
self.odom = odom | ||
|
||
|
||
def arm(self): | ||
self.send_command( | ||
VehicleCommand.VEHICLE_CMD_COMPONENT_ARM_DISARM, param1=1.0) | ||
self.get_logger().info('Arm command sent') | ||
|
||
|
||
def disarm(self): | ||
self.send_command( | ||
VehicleCommand.VEHICLE_CMD_COMPONENT_ARM_DISARM, param1=0.0) | ||
self.get_logger().info('Disarm command sent') | ||
|
||
|
||
def engage_offboard_mode(self): | ||
self.send_command( | ||
VehicleCommand.VEHICLE_CMD_DO_SET_MODE, param1=1.0, param2=6.0) | ||
self.get_logger().info("Switching to offboard mode") | ||
|
||
|
||
def land(self): | ||
self.send_command(VehicleCommand.VEHICLE_CMD_NAV_LAND) | ||
self.get_logger().info("Switching to land mode") | ||
|
||
|
||
def send_offboard_control_mode(self): | ||
msg = OffboardControlMode() | ||
msg.position = True | ||
msg.velocity = True | ||
msg.acceleration = False | ||
msg.attitude = True | ||
msg.body_rate = True | ||
msg.timestamp = int(self.get_clock().now().nanoseconds / 1000) | ||
self.offboard_ctrl_mode_pub.publish(msg) | ||
return | ||
|
||
|
||
def send_trajectory_setpoint(self, x:float, y:float, z:float, | ||
vx:float, vy:float, vz:float, | ||
yaw:float): | ||
"""Publish the trajectory setpoint.""" | ||
msg = TrajectorySetpoint() | ||
msg.position = [x, y, z] | ||
# msg.velocity = [vx, vy, vz] | ||
msg.yaw = yaw # (90 degree) | ||
msg.timestamp = int(self.get_clock().now().nanoseconds / 1000) | ||
self.traj_setpt_pub.publish(msg) | ||
self.get_logger().info(f"Publishing position setpoints {[x, y, z]}") | ||
return | ||
|
||
|
||
def send_attitude_setpoint(self): | ||
return | ||
|
||
|
||
def send_rate_setpoint(self): | ||
return | ||
|
||
|
||
def send_command(self, command, **params) -> None: | ||
"""Publish a vehicle command.""" | ||
msg = VehicleCommand() | ||
msg.command = command | ||
msg.param1 = params.get("param1", 0.0) | ||
msg.param2 = params.get("param2", 0.0) | ||
msg.param3 = params.get("param3", 0.0) | ||
msg.param4 = params.get("param4", 0.0) | ||
msg.param5 = params.get("param5", 0.0) | ||
msg.param6 = params.get("param6", 0.0) | ||
msg.param7 = params.get("param7", 0.0) | ||
msg.target_system = 1 | ||
msg.target_component = 1 | ||
msg.source_system = 1 | ||
msg.source_component = 1 | ||
msg.from_external = True | ||
msg.timestamp = int(self.get_clock().now().nanoseconds / 1000) | ||
self.command_pub.publish(msg) | ||
return | ||
|
||
|
||
''' | ||
def timer_cb(self) -> None: | ||
"""Callback function for the timer.""" | ||
self.send_offboard_control_mode() | ||
if self.offboard_setpoint_counter == 10: | ||
self.engage_offboard_mode() | ||
self.arm() | ||
if self.vehicle_local_position.z > self.takeoff_height and self.vehicle_status.nav_state == VehicleStatus.NAVIGATION_STATE_OFFBOARD: | ||
self.send_position_setpoint(0.0, 0.0, self.takeoff_height) | ||
elif self.vehicle_local_position.z <= self.takeoff_height: | ||
self.land() | ||
exit(0) | ||
if self.offboard_setpoint_counter < 11: | ||
self.offboard_setpoint_counter += 1 | ||
''' | ||
|
||
''' | ||
def main(args=None) -> None: | ||
print('Starting offboard control node...') | ||
rclpy.init(args=args) | ||
offboard_control = OffboardControlNode() | ||
rclpy.spin(offboard_control) | ||
offboard_control.destroy_node() | ||
rclpy.shutdown() | ||
if __name__ == '__main__': | ||
try: | ||
main() | ||
except Exception as e: | ||
print(e) | ||
''' |