diff --git a/CMakeLists.txt b/CMakeLists.txt
index 7fa2b44..ce67495 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,119 +1,6 @@
cmake_minimum_required(VERSION 2.8.3)
-## Compile as C++11, supported in ROS Kinetic and newer
-# add_compile_options(-std=c++11)
-## Find catkin macros and libraries
-## if COMPONENTS list like find_package(catkin REQUIRED COMPONENTS xyz)
-## is used, also find other catkin packages
find_package(catkin REQUIRED)
-## System dependencies are found with CMake's conventions
-# find_package(Boost REQUIRED COMPONENTS system)
-## Uncomment this if the package has a setup.py. This macro ensures
-## modules and global scripts declared therein get installed
-## See http://ros.org/doc/api/catkin/html/user_guide/setup_dot_py.html
-# catkin_python_setup()
-## Declare ROS messages, services and actions ##
-## Generate messages in the 'msg' folder
-# add_message_files(
-# Message1.msg
-# Message2.msg
-# )
-## Generate services in the 'srv' folder
-# add_service_files(
-# Service1.srv
-# Service2.srv
-# )
-## Generate actions in the 'action' folder
-# add_action_files(
-# Action1.action
-# Action2.action
-# )
-## Generate added messages and services with any dependencies listed here
-# generate_messages(
-# std_msgs # Or other packages containing msgs
-# )
-## Generate dynamic reconfigure parameters in the 'cfg' folder
-# generate_dynamic_reconfigure_options(
-# cfg/DynReconf1.cfg
-# cfg/DynReconf2.cfg
-# )
-## catkin specific configuration ##
-## The catkin_package macro generates cmake config files for your package
-## Declare things to be passed to dependent projects
-## INCLUDE_DIRS: uncomment this if your package contains header files
-## LIBRARIES: libraries you create in this project that dependent projects also need
-## CATKIN_DEPENDS: catkin_packages dependent projects also need
-## DEPENDS: system dependencies of this project that dependent projects also need
-# INCLUDE_DIRS include
-# LIBRARIES ros_kafka_bridge
-# CATKIN_DEPENDS other_catkin_pkg
-# DEPENDS system_lib
-## Build ##
-## Specify additional locations of header files
-## Your package locations should be listed before other locations
-# include
-# ${catkin_INCLUDE_DIRS}
-## Declare a C++ library
-# add_library(${PROJECT_NAME}
-# src/${PROJECT_NAME}/ros_kafka_bridge.cpp
-# )
-## Add cmake target dependencies of the library
-## as an example, code may need to be generated before libraries
-## either from message generation or dynamic reconfigure
-## Declare a C++ executable
-## With catkin_make all packages are built within a single CMake context
-## The recommended prefix ensures that target names across packages don't collide
-# add_executable(${PROJECT_NAME}_node src/ros_kafka_bridge_node.cpp)
-## Rename C++ executable without prefix
-## The above recommended prefix causes long target names, the following renames the
-## target back to the shorter version for ease of user use
-## e.g. "rosrun someones_pkg node" instead of "rosrun someones_pkg someones_pkg_node"
-# set_target_properties(${PROJECT_NAME}_node PROPERTIES OUTPUT_NAME node PREFIX "")
-## Add cmake target dependencies of the executable
-## same as for the library above
-# add_dependencies(${PROJECT_NAME}_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
-## Specify libraries to link a library or executable target against
-# target_link_libraries(${PROJECT_NAME}_node
-# ${catkin_LIBRARIES}
-# )
diff --git a/config/topics.yaml b/config/topics.yaml
index 659042f..c94e646 100644
--- a/config/topics.yaml
+++ b/config/topics.yaml
@@ -1,7 +1,10 @@
- - topic: "/wrist_camera/color/image_raw"
- msg_type: "sensor_msgs/Image"
- - topic: "/tf"
- msg_type: "tf2_mgs/TFMessage"
- - topic: "/joint_state"
- msg_type: "sensor_msgs/JointState"
\ No newline at end of file
+ - msg_type: "sensor_msgs/Image"
+ ros_topic: "/wrist_camera/color/image_raw"
+ kafka_topic: "wrist_camera"
+ - msg_type: "tf2_msgs/TFMessage"
+ ros_topic: "/tf"
+ kafka_topic: "frames"
+ - msg_type: "sensor_msgs/JointState"
+ ros_topic: "/joint_state"
+ kafka_topic: "arm_joints"
\ No newline at end of file
diff --git a/launch/kafka_publish.launch b/launch/kafka_publish.launch
deleted file mode 100644
index 16b4d1c..0000000
--- a/launch/kafka_publish.launch
+++ /dev/null
@@ -1,19 +0,0 @@
diff --git a/launch/kafka_publisher.launch b/launch/kafka_publisher.launch
new file mode 100644
index 0000000..c7a0637
--- /dev/null
+++ b/launch/kafka_publisher.launch
@@ -0,0 +1,15 @@
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..091375c
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,2 @@
+sudo apt install ros-noetic-rospy-message-converter
+pip install kafka-python pyyaml
\ No newline at end of file
diff --git a/scripts/convert_to_kafka.py b/scripts/convert_to_kafka.py
deleted file mode 100755
index 197f665..0000000
--- a/scripts/convert_to_kafka.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#!/usr/bin/env python
-import yaml
-def load_yaml_to_dict(yaml_file: str, robot_name: str = 'UGV') -> dict:
- with open(yaml_file, 'r') as file:
- yaml_data = yaml.safe_load(file)
- topic_dict = {topic['topic']: topic['msg_type'] for topic in yaml_data[robot_name]}
- return topic_dict
-yaml_file = "../config/topics.yaml"
-topic_dict = load_yaml_to_dict(yaml_file)
-for topic, msg_type in topic_dict.items():
- print(f"topic: {topic}, type: {msg_type}")
\ No newline at end of file
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..4c26163
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,12 @@
+from distutils.core import setup
+from catkin_pkg.python_setup import generate_distutils_setup
+# fetch values from package.xml.
+setup_args = generate_distutils_setup(
+ packages=["ros_kafka_connector"],
+ package_dir={"": "src"}
diff --git a/src/kafka_publish.py b/src/kafka_publish.py
deleted file mode 100755
index 72dc7ab..0000000
--- a/src/kafka_publish.py
+++ /dev/null
@@ -1,61 +0,0 @@
-#!/usr/bin/env python
-import json
-from kafka import KafkaProducer
-from kafka import KafkaConsumer
-import rospy
-from rospy_message_converter import json_message_converter
-from utils import import_msg_type
-class kafka_publish():
- def __init__(self):
- # initialize node
- rospy.init_node("kafka_publish")
- rospy.on_shutdown(self.shutdown)
- # Retrieve parameters from launch file
- bootstrap_server = rospy.get_param("~bootstrap_server", "localhost:9092")
- self.ros_topic = rospy.get_param("~ros_topic", "test")
- self.kafka_topic = rospy.get_param("~kafka_topic", "test")
- self.msg_type = rospy.get_param("~msg_type", "std_msgs/String")
- # Create kafka producer
- self.producer = KafkaProducer(bootstrap_servers=bootstrap_server, value_serializer=lambda m: json.dumps(m).encode('ascii'))
- # ROS does not allow a change in msg type once a topic is created. Therefore the msg
- # type must be imported and specified ahead of time.
- msg_func = import_msg_type(self.msg_type)
- # Subscribe to the topic with the chosen imported message type
- rospy.Subscriber(self.ros_topic, msg_func, self.callback)
- rospy.logwarn("Using {} MSGs from ROS: {} -> KAFKA: {}".format(self.msg_type, self.ros_topic,self.kafka_topic))
- def callback(self, msg):
- # Output msg to ROS and send to Kafka server
- rospy.logwarn("MSG Receved: {}".format(msg))
- json_str = json_message_converter.convert_ros_message_to_json(msg)
- self.producer.send(self.kafka_topic, json_str)
- def run(self):
- rate = rospy.Rate(10)
- while not rospy.is_shutdown():
- rate.sleep()
- def shutdown(self):
- rospy.loginfo("Shutting down")
-if __name__ == "__main__":
- try:
- node = kafka_publish()
- node.run()
- except rospy.ROSInterruptException:
- pass
- rospy.loginfo("Exiting")
diff --git a/src/kafka_publisher.py b/src/kafka_publisher.py
new file mode 100755
index 0000000..1e9e039
--- /dev/null
+++ b/src/kafka_publisher.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+import json
+from kafka import KafkaProducer
+from kafka import KafkaConsumer
+import rospy
+import rospkg
+from rospy_message_converter import json_message_converter
+import utils
+class KafkaPublisher:
+ # TODO need to make kafka topics!!
+ """
+ takes a yaml file with:
+ - ros msg types
+ - ros topic names
+ - kafka topic names
+ subscribes to the ros topic, and converts to json format
+ publishes json msg to the kafka topic
+ """
+ def __init__(self):
+ rospy.init_node("kafka_publisher")
+ rospy.on_shutdown(self.shutdown)
+ self.load_parameters()
+ pkg = rospkg.RosPack()
+ yaml_file = (
+ pkg.get_path("ros_kafka_connector") + "/config/" + self._filename
+ )
+ topics_dict = utils.load_yaml_to_dict(yaml_file, robot_name="UGV")
+ # start kafka producer
+ # self.producer = KafkaProducer(
+ # bootstrap_servers=self._bootstrap_server,
+ # value_serializer=lambda m: json.dumps(m).encode('ascii')
+ # )
+ for msg_type, topics in topics_dict.items():
+ ros_topic = topics["ros_topic"]
+ kafka_topic = topics["kafka_topic"]
+ msg_class = utils.import_msg_type(msg_type)
+ rospy.Subscriber(
+ ros_topic,
+ msg_class,
+ lambda msg: self.callback(msg, kafka_topic),
+ )
+ rospy.loginfo(
+ f"Using {msg_type} from ROS: {ros_topic} -> KAFKA: {kafka_topic}"
+ )
+ def load_parameters(self) -> None:
+ self._filename = rospy.get_param("~topics_filename", "topics.yaml")
+ self._bootstrap_server = rospy.get_param(
+ "~bootstrap_server", "localhost:9092"
+ )
+ self._update_rate = float(rospy.get_param("~update_rate", "10.0"))
+ def callback(self, msg, kafka_topic: str) -> None:
+ """
+ takes msg from ros, converts to json, publishes to kafka
+ :param msg: ros msg from subscriber
+ :param kafka_topic (str): kafka topic name
+ """
+ json_str = json_message_converter.convert_ros_message_to_json(msg)
+ # self.producer.send(kafka_topic, json_str)
+ def run(self):
+ rate = rospy.Rate(self._update_rate)
+ while not rospy.is_shutdown():
+ rate.sleep()
+ def shutdown(self):
+ rospy.loginfo("Shutting down")
+if __name__ == "__main__":
+ try:
+ node = KafkaPublisher()
+ node.run()
+ except rospy.ROSInterruptException:
+ pass
+ rospy.loginfo("Exiting")
diff --git a/src/ros_publish.py b/src/ros_publish.py
index cae4fe4..4bafca4 100755
--- a/src/ros_publish.py
+++ b/src/ros_publish.py
@@ -7,8 +7,8 @@
from rospy_message_converter import json_message_converter
from utils import import_msg_type
-class ros_publish():
+class ros_publish:
def __init__(self):
# initialize node
@@ -16,45 +16,55 @@ def __init__(self):
# Retrieve parameters from launch file
- bootstrap_server = rospy.get_param("~bootstrap_server", "localhost:9092")
+ bootstrap_server = rospy.get_param(
+ "~bootstrap_server", "localhost:9092"
+ )
self.ros_topic = rospy.get_param("~ros_topic", "test")
self.kafka_topic = rospy.get_param("~kafka_topic", "test")
self.msg_type = rospy.get_param("~msg_type", "std_msgs/String")
# Create kafka consumer
- self.consumer = KafkaConsumer(self.kafka_topic,
- bootstrap_servers=bootstrap_server,
- value_deserializer=lambda m: json.loads(m.decode('ascii')),
- auto_offset_reset='latest',
- consumer_timeout_ms=5000)
+ self.consumer = KafkaConsumer(
+ self.kafka_topic,
+ bootstrap_servers=bootstrap_server,
+ value_deserializer=lambda m: json.loads(m.decode("ascii")),
+ auto_offset_reset="latest",
+ consumer_timeout_ms=5000,
+ )
# Import msg type
msg_func = import_msg_type(self.msg_type)
- # Subscribe to ROS topic of interest
- self.publisher = rospy.Publisher(self.ros_topic, msg_func, queue_size=10)
- rospy.logwarn("Using {} MSGs from KAFKA: {} -> ROS: {}".format(self.msg_type, self.kafka_topic,self.ros_topic))
+ # Subscribe to ROS topic of interest
+ self.publisher = rospy.Publisher(
+ self.ros_topic, msg_func, queue_size=10
+ )
+ rospy.logwarn(
+ "Using {} MSGs from KAFKA: {} -> ROS: {}".format(
+ self.msg_type, self.kafka_topic, self.ros_topic
+ )
+ )
def run(self):
while not rospy.is_shutdown():
for msg in self.consumer:
# Convert Kafka message to JSON string
json_str = json.dumps(msg.value)
# Convert JSON to ROS message
- ros_msg = json_message_converter.convert_json_to_ros_message(self.msg_type, json_str)
+ ros_msg = json_message_converter.convert_json_to_ros_message(
+ self.msg_type, json_str
+ )
# Publish to ROS topic
rospy.logwarn("Received MSG: {}".format(json_str))
- rospy.spin()
+ rospy.spin()
def shutdown(self):
rospy.loginfo("Shutting down")
if __name__ == "__main__":
diff --git a/src/topics.yaml b/src/topics.yaml
new file mode 100644
index 0000000..9bc7dcf
--- /dev/null
+++ b/src/topics.yaml
@@ -0,0 +1,10 @@
+ - msg_type: "sensor_msgs/Image"
+ ros_topic: "/wrist_camera/color/image_raw"
+ kafka_topic: "wrist_camera"
+ - msg_type: "tf2_mgs/TFMessage"
+ ros_topic: "/tf"
+ kafka_topic: "frames"
+ - msg_type: "sensor_msgs/JointState"
+ ros_topic: "/joint_state"
+ kafka_topic: "arm_joints"
\ No newline at end of file
diff --git a/src/utils.py b/src/utils.py
index 05d646c..5ee5cba 100755
--- a/src/utils.py
+++ b/src/utils.py
@@ -1,74 +1,148 @@
#!/usr/bin/env python
+import rospy
+import yaml
For custom msg types, need to build pkg in workspace and then add here
-def import_msg_type(msg_type):
- # Adding a new msg type is as easy as including an import and updating the variable
+def load_yaml_to_dict(yaml_file: str, robot_name: str = "UGV") -> dict:
+ """
+ takes yaml file and converts to dict which includes topic names & msg types
+ :param yaml_file (str): path to yaml file
+ :param robot_name(str, optional): first line of yaml file, default -> 'UGV'
+ :return topic_dict (dict): dictionary mapping topics & msg types
+ example: topic_list.yaml
+ robot_name:
+ - msg_type: "std_msgs/String"
+ ros_topic: "/string"
+ kafka_topic: "string"
+ - msg_type: "geometry_msgs/Pose"
+ ros_topic: "/pose"
+ kafka_topic: "pose"
+ topic_dict: {
+ 'std_msgs/String': {'/string', 'string'},
+ 'geometry_msgs/Pose' : {'/pose', 'pose'}
+ }
+ """
+ with open(yaml_file, "r") as file:
+ yaml_data = yaml.safe_load(file)
+ topics_dict = {}
+ for topic in yaml_data[robot_name]:
+ msg_type = topic["msg_type"]
+ ros_topic = topic["ros_topic"]
+ kafka_topic = topic["kafka_topic"]
+ topics_dict[msg_type] = {
+ "ros_topic": ros_topic,
+ "kafka_topic": kafka_topic,
+ }
+ return topics_dict
+def import_msg_type(msg_type: str):
+ """
+ takes a ros msg_type and dynamically imports the msg type and returns it
+ :params msg_type (str): the string identifier for the ROS msg type
+ :return subscriber_type (class): the corresponding ROS msg class
+ :raises ValueError: if msg_type is not found
+ """
if msg_type == "std_msgs/String":
from std_msgs.msg import String
subscriber_msg = String
- if msg_type == "std_msgs/Bool":
+ elif msg_type == "std_msgs/Bool":
from std_msgs.msg import Bool
subscriber_msg = Bool
- if msg_type == "std_msgs/Empty":
+ elif msg_type == "std_msgs/Empty":
from std_msgs.msg import Empty
- subscriber_msg = Bool
+ subscriber_msg = Empty
elif msg_type == "geometry_msgs/Twist":
from geometry_msgs.msg import Twist
subscriber_msg = Twist
elif msg_type == "geometry_msgs/Pose":
from geometry_msgs.msg import Pose
subscriber_msg = Pose
elif msg_type == "geometry_msgs/PoseArray":
from geometry_msgs.msg import PoseArray
subscriber_msg = PoseArray
elif msg_type == "geometry_msgs/PoseStamped":
from geometry_msgs.msg import PoseStamped
subscriber_msg = PoseStamped
elif msg_type == "geometry_msgs/PoseWithCovariance":
from geometry_msgs.msg import PoseWithCovariance
subscriber_msg = PoseWithCovariance
elif msg_type == "geometry_msgs/PoseWithCovarianceStamped":
from geometry_msgs.msg import PoseWithCovarianceStamped
subscriber_msg = PoseWithCovarianceStamped
elif msg_type == "geometry_msgs/Vector3":
from geometry_msgs.msg import Vector3
subscriber_msg = Vector3
- elif msg_type == "sensors_msgs/Image":
+ elif msg_type == "sensor_msgs/Image":
from sensor_msgs.msg import Image
subscriber_msg = Image
elif msg_type == "sensor_msgs/LaserScan":
from sensor_msgs.msg import LaserScan
subscriber_msg = LaserScan
elif msg_type == "sensor_msgs/BatteryState":
from sensor_msgs.msg import BatteryState
subscriber_msg = BatteryState
elif msg_type == "sensor_msgs/Imu":
from sensor_msgs.msg import Imu
subscriber_msg = Imu
elif msg_type == "sensor_msgs/PointCloud2":
from sensor_msgs.msg import PointCloud2
subscriber_msg = PointCloud2
elif msg_type == "sensor_msgs/JointState":
from sensor_msgs.msg import JointState
subscriber_msg = JointState
elif msg_type == "sensor_msgs/NavSatFix":
from sensor_msgs.msg import NavSatFix
subscriber_msg = NavSatFix
elif msg_type == "nav_msgs/Odometry":
from nav_msgs.msg import Odometry
subscriber_msg = Odometry
elif msg_type == "nav_msgs/OccupancyGrid":
from nav_msgs.msg import OccupancyGrid
subscriber_msg = OccupancyGrid
elif msg_type == "actionlib_msgs/GoalStatus":
from actionlib_msgs.msg import GoalStatus
subscriber_msg = GoalStatus
+ elif msg_type == "tf2_msgs/TFMessage":
+ from tf2_msgs.msg import TFMessage
+ subscriber_msg = TFMessage
- raise ValueError("MSG NOT SUPPORTED: Only String/Twist/Image are currently supported. \
- Please add imports to utils.py for specific msg type.")
+ raise ValueError(
+ f'MSG "{msg_type}" IS NOT SUPPORTED \nPlease add imports to utils.py for specific msg type.'
+ )
return subscriber_msg