Skip to content

Commit

Permalink
works
Browse files Browse the repository at this point in the history
  • Loading branch information
InvincibleRMC committed Apr 16, 2024
1 parent f4a28dc commit f510607
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 23 deletions.
12 changes: 12 additions & 0 deletions config/sub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mqtt_ros_bridge:
ros__parameters:
turtle_pub:
topic: "/turtle1/cmd_vel"
type: "geometry_msgs.msg:Twist"
publish_on_ros: False

yippee:
topic: "foo"
type: "std_msgs.msg:String"
publish_on_ros: True
use_ros_serializer: True
2 changes: 1 addition & 1 deletion launch/demo_pub_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def generate_launch_description() -> LaunchDescription:
executable='bridge_node',
emulate_tty=True,
output='screen',
parameters=[config]
arguments=[config]
)

turtle_sim = Node(
Expand Down
22 changes: 21 additions & 1 deletion launch/demo_sub_launch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os

from ament_index_python.packages import get_package_share_directory
from launch.actions import SetEnvironmentVariable
from launch.launch_description import LaunchDescription
from launch_ros.actions import Node
Expand All @@ -13,14 +16,31 @@ def generate_launch_description() -> LaunchDescription:
Launches bridge_node.
"""

config = os.path.join(
get_package_share_directory('mqtt_ros_bridge'),
'config',
'sub.yaml'
)

run_bridge_node = Node(
package='mqtt_ros_bridge',
executable='bridge_node',
emulate_tty=True,
output='screen',
arguments=[config]
)

turtle_sim = Node(
package='rqt_robot_steering',
executable='rqt_robot_steering',
emulate_tty=True,
parameters=[{"topic": "/turtle1/cmd_vel"}],
output='screen'
)

return LaunchDescription([
SetEnvironmentVariable("ROS_DOMAIN_ID", "1"),
run_bridge_node
run_bridge_node,
turtle_sim
])
74 changes: 53 additions & 21 deletions mqtt_ros_bridge/bridge_node.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import sys
from typing import Any, Callable, Generic

import paho.mqtt.client as MQTT
Expand All @@ -9,24 +11,28 @@
from mqtt_ros_bridge.msg_typing import MsgLikeT
from mqtt_ros_bridge.serializer import (JSONSerializer, ROSDefaultSerializer,
Serializer)
from mqtt_ros_bridge.util import lookup_object
from mqtt_ros_bridge.util import lookup_object, parameter_dict_from_yaml_file


class TopicInfo(Generic[MsgLikeT]):
"""Metadata about a single topic."""

def __init__(self, name: str, msg_object_path: str,
def __init__(self, topic: str, msg_object_path: str,
publish_on_ros: bool, use_ros_serializer: bool = True) -> None:

self.name = name
self.msg_type = lookup_object(msg_object_path)
self.topic = topic
self.msg_type: MsgLikeT = lookup_object(msg_object_path)
self.publish_on_ros = publish_on_ros

if use_ros_serializer:
self.serializer: type[Serializer] = ROSDefaultSerializer
else:
self.serializer = JSONSerializer

def __str__(self) -> str:
return (f"Topic: {self.topic}, Message Type: {self.msg_type}, Publish on ROS:"
f"{self.publish_on_ros}, Serializer: {self.serializer}")


# TOPICS: dict[str, TopicInfo] = {
# '/turtle1/cmd_vel': TopicInfo('/turtle1/cmd_vel', "std_msgs.msg:String", False),
Expand All @@ -45,14 +51,22 @@ def __init__(self, name: str, msg_object_path: str,
class BridgeNode(Node):
"""Node to bridge MQTT and ROS."""

def __init__(self) -> None:
def __init__(self, args: list[str]) -> None:
super().__init__('mqtt_bridge_node')

self.topics = self.topic_info_from_parameters()

# TODO get from parameters
DEBUG = True

print(args)

if (len(args) != 2 and "--ros-args" not in args) or not (len(args) == 3 and
"--ros-args" in args):
raise ValueError("To many arguments given")

self.topics = self.topic_info_from_parameters(args[1])

self.get_logger().info(str(self.topics))

self.get_logger().info('Creating MQTT ROS bridge node')

self.mqtt_client = MQTT.Client()
Expand All @@ -64,25 +78,43 @@ def __init__(self) -> None:

for topic_info in self.topics.values():
if topic_info.publish_on_ros:
publisher = self.create_publisher(topic_info.msg_type, topic_info.name, 10)
self.ros_publishers[topic_info.name] = publisher
self.mqtt_client.subscribe(topic_info.name)
publisher = self.create_publisher(topic_info.msg_type, topic_info.topic, 10)
self.ros_publishers[topic_info.topic] = publisher
self.mqtt_client.subscribe(topic_info.topic)
else:
callback = self.make_ros_callback(topic_info)
# TODO proper QOS?
self.create_subscription(topic_info.msg_type, topic_info.name, callback, 10)
self.create_subscription(topic_info.msg_type, topic_info.topic, callback, 10)

self.mqtt_client.on_message = self.mqtt_callback

def topic_info_from_parameters(self) -> dict[str, TopicInfo]:
dictionary = {}
def topic_info_from_parameters(self, config: str) -> dict[str, TopicInfo]:

config = os.path.expanduser(config)
topic_infos: dict[str, TopicInfo] = {}

params = parameter_dict_from_yaml_file(config)

params = self.get_parameters_by_prefix('')
unique_names: set[str] = set()
for names in params.keys():
# TODO Check that right half matches PARAMETER_*
unique_names.add(names.split(".")[0])

self.get_logger().info(str(params))
for name in unique_names:

if params.get(f"{name}.{PARAMETER_USE_ROS_SERIALIZER}", None):
ros_serialiser = params[f"{name}.{PARAMETER_USE_ROS_SERIALIZER}"].value
else:
ros_serialiser = False

return dictionary
topic_infos[params[f"{name}.{PARAMETER_TOPIC}"].value] = (TopicInfo(
params[f"{name}.{PARAMETER_TOPIC}"].value,
params[f"{name}.{PARAMETER_TYPE}"].value,
params[f"{name}.{PARAMETER_PUBLISH_ON_ROS}"].value,
ros_serialiser
))

return topic_infos

def make_ros_callback(self, topic_info: TopicInfo[MsgLikeT]) -> Callable[[MsgLikeT], None]:
"""
Expand All @@ -95,8 +127,8 @@ def make_ros_callback(self, topic_info: TopicInfo[MsgLikeT]) -> Callable[[MsgLik
"""
def callback(msg: MsgLikeT) -> None:
self.get_logger().info(f'ROS RECEIVED: Topic: "{topic_info.name}" Payload: "{msg}"')
self.mqtt_client.publish(topic_info.name, topic_info.serializer.serialize(msg))
self.get_logger().info(f'ROS RECEIVED: Topic: "{topic_info.topic}" Payload: "{msg}"')
self.mqtt_client.publish(topic_info.topic, topic_info.serializer.serialize(msg))

return callback

Expand All @@ -118,7 +150,7 @@ def mqtt_callback(self, _client: MQTT.Client,
topic_info = self.topics[mqtt_msg.topic]

self.get_logger().info(
f'MQTT RECEIVED: Topic: "{topic_info.name}" Payload: "{mqtt_msg.payload!r}"')
f'MQTT RECEIVED: Topic: "{topic_info.topic}" Payload: "{mqtt_msg.payload!r}"')

try:
ros_msg = topic_info.serializer.deserialize(mqtt_msg.payload, topic_info.msg_type)
Expand All @@ -127,14 +159,14 @@ def mqtt_callback(self, _client: MQTT.Client,
f'MQTT on topic "{mqtt_msg.topic}": "{mqtt_msg.payload!r}"')
return

self.ros_publishers[topic_info.name].publish(ros_msg)
self.ros_publishers[topic_info.topic].publish(ros_msg)


def main(args=None):
"""Run bridge node; used in ROS executable."""
rclpy.init(args=args)

bridge_node = BridgeNode()
bridge_node = BridgeNode(sys.argv)

rclpy.spin(bridge_node)

Expand Down
169 changes: 169 additions & 0 deletions mqtt_ros_bridge/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from importlib import import_module
from typing import Optional

import yaml
from rcl_interfaces.msg import Parameter as ParameterMsg
from rcl_interfaces.msg import ParameterType, ParameterValue
from rclpy.parameter import PARAMETER_SEPARATOR_STRING, Parameter


def lookup_object(object_path: str, package: str = 'mqtt_ros_bridge') -> object:
Expand All @@ -7,3 +13,166 @@ def lookup_object(object_path: str, package: str = 'mqtt_ros_bridge') -> object:
module = import_module(module_name, package)
obj = getattr(module, obj_name)
return obj


# Kind of exist in ros-iron??? very confusing
def get_parameter_value(string_value: str) -> ParameterValue:
"""
Guess the desired type of the parameter based on the string value.
:param string_value: The string value to be converted to a ParameterValue.
:return: The ParameterValue.
"""
value = ParameterValue()
try:
yaml_value = yaml.safe_load(string_value)
except yaml.parser.ParserError:
yaml_value = string_value

if isinstance(yaml_value, bool):
value.type = ParameterType.PARAMETER_BOOL
value.bool_value = yaml_value
elif isinstance(yaml_value, int):
value.type = ParameterType.PARAMETER_INTEGER
value.integer_value = yaml_value
elif isinstance(yaml_value, float):
value.type = ParameterType.PARAMETER_DOUBLE
value.double_value = yaml_value
elif isinstance(yaml_value, list):
if all((isinstance(v, bool) for v in yaml_value)):
value.type = ParameterType.PARAMETER_BOOL_ARRAY
value.bool_array_value = yaml_value
elif all((isinstance(v, int) for v in yaml_value)):
value.type = ParameterType.PARAMETER_INTEGER_ARRAY
value.integer_array_value = yaml_value
elif all((isinstance(v, float) for v in yaml_value)):
value.type = ParameterType.PARAMETER_DOUBLE_ARRAY
value.double_array_value = yaml_value
elif all((isinstance(v, str) for v in yaml_value)):
value.type = ParameterType.PARAMETER_STRING_ARRAY
value.string_array_value = yaml_value
else:
value.type = ParameterType.PARAMETER_STRING
value.string_value = string_value
else:
value.type = ParameterType.PARAMETER_STRING
value.string_value = yaml_value if yaml_value is not None else string_value
return value


def parameter_value_to_python(parameter_value: ParameterValue):
"""
Get the value for the Python builtin type from a rcl_interfaces/msg/ParameterValue object.
Returns the value member of the message based on the ``type`` member.
Returns ``None`` if the parameter is "NOT_SET".
:param parameter_value: The message to get the value from.
:raises RuntimeError: if the member ``type`` has an unexpected value.
"""
if parameter_value.type == ParameterType.PARAMETER_BOOL:
value = parameter_value.bool_value
elif parameter_value.type == ParameterType.PARAMETER_INTEGER:
value = parameter_value.integer_value
elif parameter_value.type == ParameterType.PARAMETER_DOUBLE:
value = parameter_value.double_value
elif parameter_value.type == ParameterType.PARAMETER_STRING:
value = parameter_value.string_value
elif parameter_value.type == ParameterType.PARAMETER_BYTE_ARRAY:
value = list(parameter_value.byte_array_value)
elif parameter_value.type == ParameterType.PARAMETER_BOOL_ARRAY:
value = list(parameter_value.bool_array_value)
elif parameter_value.type == ParameterType.PARAMETER_INTEGER_ARRAY:
value = list(parameter_value.integer_array_value)
elif parameter_value.type == ParameterType.PARAMETER_DOUBLE_ARRAY:
value = list(parameter_value.double_array_value)
elif parameter_value.type == ParameterType.PARAMETER_STRING_ARRAY:
value = list(parameter_value.string_array_value)
elif parameter_value.type == ParameterType.PARAMETER_NOT_SET:
value = None
else:
raise RuntimeError(f'unexpected parameter type {parameter_value.type}')

return value


def parameter_dict_from_yaml_file(
parameter_file: str,
use_wildcard: bool = False,
target_nodes: Optional[list[str]] = None,
namespace: str = ''
) -> dict[str, Parameter]:
"""
Build a dict of parameters from a YAML file.
Will load all parameters if ``target_nodes`` is None or empty.
:raises RuntimeError: if a target node is not in the file
:raises RuntimeError: if the is not a valid ROS parameter file
:param parameter_file: Path to the YAML file to load parameters from.
:param use_wildcard: Use wildcard matching for the target nodes.
:param target_nodes: list of nodes in the YAML file to load parameters from.
:param namespace: Namespace to prepend to all parameters.
:return: A dict of Parameter messages keyed by the parameter names
"""
with open(parameter_file, 'r') as f:
param_file = yaml.safe_load(f)
param_keys = []
param_dict = {}

if use_wildcard and '/**' in param_file:
param_keys.append('/**')

if target_nodes:
for n in target_nodes:
if n not in param_file.keys():
raise RuntimeError(f'Param file does not contain parameters for {n},'
f'only for nodes: {list(param_file.keys())} ')
param_keys.append(n)
else:
# wildcard key must go to the front of param_keys so that
# node-namespaced parameters will override the wildcard parameters
keys = set(param_file.keys())
keys.discard('/**')
param_keys.extend(keys)

if len(param_keys) == 0:
raise RuntimeError('Param file does not contain selected parameters')

for n in param_keys:
value = param_file[n]
if not isinstance(value, dict) or 'ros__parameters' not in value:
raise RuntimeError(f'YAML file is not a valid ROS parameter file for node {n}')
param_dict.update(value['ros__parameters'])
# Modification done here
new_dictionary: dict[str, Parameter] = {}
dictionary = _unpack_parameter_dict(namespace, param_dict)

for key, parameter_msg in dictionary.items():
new_dictionary[key] = Parameter.from_parameter_msg(parameter_msg)
return new_dictionary


def _unpack_parameter_dict(namespace, parameter_dict):
"""
Flatten a parameter dictionary recursively.
:param namespace: The namespace to prepend to the parameter names.
:param parameter_dict: A dictionary of parameters keyed by the parameter names
:return: A dict of Parameter objects keyed by the parameter names
"""
parameters: dict[str, ParameterMsg] = {}
for param_name, param_value in parameter_dict.items():
full_param_name = namespace + param_name
# Unroll nested parameters
if isinstance(param_value, dict):
parameters.update(_unpack_parameter_dict(
namespace=full_param_name + PARAMETER_SEPARATOR_STRING,
parameter_dict=param_value))
else:
parameter = ParameterMsg()
parameter.name = full_param_name
parameter.value = get_parameter_value(str(param_value))
parameters[full_param_name] = parameter
return parameters
Loading

0 comments on commit f510607

Please sign in to comment.