diff --git a/CMakeLists.txt b/CMakeLists.txt index ce67495..48d4aa3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,4 +3,13 @@ project(ros_kafka_connector) find_package(catkin REQUIRED) catkin_python_setup() -catkin_package() +catkin_package( + CATKIN_DEPENDS + rospy + std_msgs + geometry_msgs + sensor_msgs + tf2_msgs + heron_msgs + robotnik_msgs +) diff --git a/README.md b/README.md index 0c393b1..3269f90 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,8 @@ topic_dict: { ``` Message types that are added are in the [`utils.py`](https://github.com/ethz-asl/ros-kafka-connector/blob/master/src/utils.py) file. To add new message types to the converter, add cases to `import_msg_type` function. If you have custom messages from ROS, you need to make them callable in your ros workspace. +This package does depend on both [`robotnik_msgs`](https://github.com/RobotnikAutomation/robotnik_msgs/tree/ros-devel) and [`heron_msgs`](https://github.com/RobotnikAutomation/heron_msgs). Make sure to add these packages to your catkin ws. + | Parameter | Info | Default | | ------------- |:-------------:| -----:| | bootstrap_server | IP of kafka server | "localhost:9092" | diff --git a/config/heron_pilot.yaml b/config/heron_pilot.yaml new file mode 100644 index 0000000..bbc60e0 --- /dev/null +++ b/config/heron_pilot.yaml @@ -0,0 +1,19 @@ +UGV: + - msg_type: "sensor_msgs/Image" + ros_topic: "/robot/arm_camera/front_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.arm-camera.rgb.image-raw" + - msg_type: "sensor_msgs/Image" + ros_topic: "/robot/body_camera/front_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.body-camera.rgb.image-raw" + - msg_type: "sensor_msgs/NavSatFix" + ros_topic: "/robot/gps/fix" + kafka_topic: "ugv.gps-fix" + - msg_type: "robotnik_msgs/BatteryStatus" + ros_topic: "/robot/battery_estimator/data" + kafka_topic: "ugv.battery-status" + - msg_type: "robotnik_msgs/State" + ros_topic: "/robot/robot_local_control/RobotStatusComponent/state" + kafka_topic: "ugv.status" + - msg_type: "std_msgs/String" + ros_topic: "/hlp/state" + kafka_topic: "ugv.hlp.status" diff --git a/config/pothole_test.yaml b/config/pothole_test.yaml new file mode 100644 index 0000000..6ffd27f --- /dev/null +++ b/config/pothole_test.yaml @@ -0,0 +1,7 @@ +UGV: + - msg_type: "sensor_msgs/Image" + ros_topic: "/wrist_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.image.arm" + - msg_type: "sensor_msgs/Image" + ros_topic: "/front_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.image.body" diff --git a/config/topics.yaml b/config/topics.yaml index c94e646..85caa0a 100644 --- a/config/topics.yaml +++ b/config/topics.yaml @@ -1,10 +1,13 @@ UGV: - - msg_type: "sensor_msgs/Image" - ros_topic: "/wrist_camera/color/image_raw" - kafka_topic: "wrist_camera" - - msg_type: "tf2_msgs/TFMessage" - ros_topic: "/tf" - kafka_topic: "frames" - - msg_type: "sensor_msgs/JointState" - ros_topic: "/joint_state" - kafka_topic: "arm_joints" \ No newline at end of file + - msg_type: "robotnik_msgs/BatteryStatus" + ros_topic: "/robot/battery_estimator/data" + kafka_topic: "ugv.battery.status" + - msg_type: "sensor_msgs/NavSatFix" + ros_topic: "/robot/gps/fix" + kafka_topic: "ugv.gps.fix" + - msg_type: "robotnik_msgs/State" + ros_topic: "/robot/robot_local_control/RobotStatusComponent/state" + kafka_topic: "ugv.state" + - msg_type: "std_msgs/String" + ros_topic: "/hlp/state" + kafka_topic: "hlp.state" diff --git a/launch/kafka_create_topics.launch b/launch/kafka_create_topics.launch new file mode 100644 index 0000000..ddd4d9d --- /dev/null +++ b/launch/kafka_create_topics.launch @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + + + + + diff --git a/package.xml b/package.xml index 82121a3..b79e3b4 100644 --- a/package.xml +++ b/package.xml @@ -9,13 +9,19 @@ MIT catkin - roscpp - rospy - std_msgs - geometry_msgs - sensor_msgs - nav_msgs - tf2_msgs + + rospy + std_msgs + geometry_msgs + sensor_msgs + nav_msgs + tf2_msgs + + + heron_msgs + robotnik_msgs + heron_msgs + robotnik_msgs rospy_message_converter diff --git a/scripts/image_consumer.py b/scripts/image_consumer.py old mode 100644 new mode 100755 index ddfbd6b..b5b153c --- a/scripts/image_consumer.py +++ b/scripts/image_consumer.py @@ -37,7 +37,7 @@ def consume_images(kafka_topic: str, kafka_server: str): parser = argparse.ArgumentParser() - parser.add_argument("-t", "--topic", default="heron.sim-test.wrist-camera", help="Kafka topic name") + parser.add_argument("-t", "--topic", default="test.image.stream", help="Kafka topic name") parser.add_argument("-s", "--server", default="10.2.0.8:9092", help="Kafka bootstrap server") args = parser.parse_args() diff --git a/setup.py b/setup.py index 4c26163..8527a83 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ # fetch values from package.xml. setup_args = generate_distutils_setup( - packages=["ros_kafka_connector"], + packages=["ros_kafka_connector"], package_dir={"": "src"} ) diff --git a/src/kafka_create_topics.py b/src/kafka_create_topics.py new file mode 100755 index 0000000..8f8d380 --- /dev/null +++ b/src/kafka_create_topics.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python + +from cv_bridge import CvBridge +from confluent_kafka.admin import AdminClient, NewTopic +import rospy +import rospkg +import utils + + +class KafkaCreateTopics: + """ + takes a yaml file with: + - ros msg types + - ros topic names + - kafka topic names + + creates kafka topics + + """ + + def __init__(self): + + rospy.init_node("kafka_topic_creator") + + self.load_parameters() + pkg = rospkg.RosPack() + yaml_file = ( + pkg.get_path("ros_kafka_connector") + "/config/" + self._filename + ) + self.bridge = CvBridge() + + topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) + + # initialise admin client to create topics + self.admin_client = AdminClient( + {"bootstrap.servers": self._bootstrap_server} + ) + + try: + self.admin_client.list_topics(timeout=5) + rospy.loginfo("Kafka connection successful.") + except Exception as err: + rospy.logerr(f"Failed to connect to Kafka: {err}") + rospy.signal_shutdown("Kafka connection failed.") + + self.create_kafka_topics(topics_dict) + + def load_parameters(self) -> None: + self._filename = rospy.get_param("~topics_filename", "topics.yaml") + self._bootstrap_server = rospy.get_param( + "~bootstrap_server", "10.2.0.8:9092" + ) + self._security_protocol = rospy.get_param( + "~security_protocol", "PLAINTEXT" + ) + self._update_rate = float(rospy.get_param("~update_rate", "10.0")) + self._robot_name = rospy.get_param("~robot_name", "UGV") + + def create_kafka_topics(self, topics_dict: dict) -> None: + """ + creates kafka topics based on config + + :param topics_dict (dict): dictionary of kafka & ros topics + """ + kafka_topics = [ + topics["kafka_topic"] for topics in topics_dict.values() + ] + + # check topic doesn't already exist + existing_topics = self.admin_client.list_topics().topics.keys() + new_topics = [ + NewTopic(topic, num_partitions=1, replication_factor=1) + for topic in kafka_topics + if topic not in existing_topics + ] + + if new_topics: + rospy.loginfo( + f"Creating kafka topic {[t.topic for t in new_topics]}" + ) + futures = self.admin_client.create_topics(new_topics) + + for topic, future in futures.items(): + try: + future.result() # wait for op to finish + rospy.loginfo(f"Kafka topic '{topic}' created sucessfully!") + except Exception as err: + rospy.logerr(f"Failed to create topic '{topic}' : {err}") + + else: + rospy.logerr("All kafka topics already exist.") + + +if __name__ == "__main__": + + try: + node = KafkaCreateTopics() + except rospy.ROSInterruptException: + pass + + rospy.loginfo("Exiting") diff --git a/src/kafka_image_publisher.py b/src/kafka_image_publisher.py new file mode 100755 index 0000000..e1889e1 --- /dev/null +++ b/src/kafka_image_publisher.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python + +import json +import base64 +import cv2 +from cv_bridge import CvBridge +from kafka import KafkaProducer, KafkaConsumer +from confluent_kafka.admin import AdminClient, NewTopic +import rospy +import rospkg +from rospy_message_converter import json_message_converter +from sensor_msgs.msg import Image +from std_srvs.srv import Empty + +from heron_msgs.srv import ( + SendImageToKafka, + SendImageToKafkaRequest, + SendImageToKafkaResponse, +) + +class KafkaImagePublisher: + """ + sends ROS images to kafka with ROS srv call + images in kafka are in the Base64 type. + + services will send an image before and after image processing + e.g. pothole detection + + hardcoded ros topics & kafka topics + kafka topics should already be created + """ + def __init__(self): + + rospy.init_node("kafka_image_publisher") + + self.load_parameters() + self.bridge = CvBridge() + + # start kafka producer + self.producer = KafkaProducer( + bootstrap_servers=self.bootstrap_server_, + security_protocol=self.security_protocol_, + value_serializer=lambda m: json.dumps(m).encode("ascii"), + ) + + self.bridge = CvBridge() # start bridge to convert images + + # services for before/after image processing + self.body_img_srv = rospy.Service( + "/kafka/publish_body_image", + SendImageToKafka, + self.send_body_image_cb, + ) + self.arm_img_srv = rospy.Service( + "/kafka/publish_arm_image", SendImageToKafka, self.send_arm_image_cb + ) + + rospy.spin() + + def load_parameters(self) -> None: + self.filename_ = rospy.get_param("~topics_filename", "topics.yaml") + self.bootstrap_server_ = rospy.get_param( + "~bootstrap_server", "10.2.0.8:9092" + ) + self.security_protocol_ = rospy.get_param( + "~security_protocol", "PLAINTEXT" + ) + self.update_rate_ = float(rospy.get_param("~update_rate", "10.0")) + self.rate_ = rospy.Rate(self.update_rate_) + self.robot_name_ = rospy.get_param("~robot_name", "UGV") + self.body_ros_topic_ = "/front_rgbd_camera/rgb/image_raw" + self.arm_ros_topic_ = "/wrist_rgbd_camera/rgb/image_raw" + self.body_kafka_topic_ = "ugv.image.body" + self.arm_kafka_topic_ = "ugv.image.arm" + + def send_body_image_cb( + self, req: SendImageToKafkaRequest + ) -> SendImageToKafkaResponse: + + return self.send_to_kafka( + self.body_kafka_topic_, req.message, req.image + ) + + def send_arm_image_cb( + self, req: SendImageToKafkaRequest + ) -> SendImageToKafkaResponse: + + return self.send_to_kafka(self.arm_kafka_topic_, req.message, req.image) + + def send_to_kafka(self, kafka_topic: str, msg: str, img: Image) -> SendImageToKafkaResponse: + try: + cv_image = self.bridge.imgmsg_to_cv2( + img, desired_encoding="passthrough" + ) + _, buffer = cv2.imencode( + ".jpg", cv_image, [cv2.IMWRITE_JPEG_QUALITY, 50] + ) + base64_image = base64.b64encode(buffer).decode("utf-8") + + # Create a json message + json_message = {"message": msg, "image_data": base64_image} + rospy.loginfo(f"Encoded image to Base64 for topic {kafka_topic}") + self.producer.send(kafka_topic, json_message) + return SendImageToKafkaResponse(success=True) + except Exception as err: + rospy.logerr( + f"Failed to process image message for topic {kafka_topic}: {err}" + ) + return SendImageToKafkaResponse(success=False) + + +if __name__ == "__main__": + + try: + node = KafkaImagePublisher() + except rospy.ROSInterruptException: + pass + + rospy.loginfo("Exiting") diff --git a/src/kafka_publisher.py b/src/kafka_publisher.py index 7980c65..73d9545 100755 --- a/src/kafka_publisher.py +++ b/src/kafka_publisher.py @@ -1,20 +1,19 @@ #!/usr/bin/env python -import json -import base64 -import cv2 -from cv_bridge import CvBridge -from kafka import KafkaProducer, KafkaConsumer -from confluent_kafka.admin import AdminClient, NewTopic import rospy import rospkg +import json + +from typing import Dict, Any +from kafka import KafkaProducer +from confluent_kafka.admin import AdminClient, NewTopic from rospy_message_converter import json_message_converter from sensor_msgs.msg import Image + import utils class KafkaPublisher: - # TODO need to make kafka topics!! """ takes a yaml file with: - ros msg types @@ -26,7 +25,7 @@ class KafkaPublisher: """ - def __init__(self): + def __init__(self) -> None: rospy.init_node("kafka_publisher") rospy.on_shutdown(self.shutdown) @@ -36,9 +35,8 @@ def __init__(self): yaml_file = ( pkg.get_path("ros_kafka_connector") + "/config/" + self._filename ) - self.bridge = CvBridge() - topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) + self.topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) # initialise admin client to create topics self.admin_client = AdminClient( @@ -52,7 +50,7 @@ def __init__(self): rospy.logerr(f"Failed to connect to Kafka: {err}") rospy.signal_shutdown("Kafka connection failed.") - self.create_kafka_topics(topics_dict) + self.create_kafka_topics(self.topics_dict) # start kafka producer self.producer = KafkaProducer( @@ -61,28 +59,24 @@ def __init__(self): value_serializer=lambda m: json.dumps(m).encode("ascii"), ) - for msg_type, topics in topics_dict.items(): - ros_topic = topics["ros_topic"] - kafka_topic = topics["kafka_topic"] + # create topic storage for the latest messages + self.latest_msgs = { + details["ros_topic"]: None for details in self.topics_dict.values() + } + + # subscribers for all topics + for msg_type, details in self.topics_dict.items(): + ros_topic = details["ros_topic"] msg_class = utils.import_msg_type(msg_type) - if msg_class == Image: - rospy.Subscriber( - ros_topic, - msg_class, - lambda msg, kafka_topic=kafka_topic: self.image_callback(msg, kafka_topic), - ) - rospy.loginfo( - f"Subscribing to Image topic: {ros_topic} -> KAFKA: {kafka_topic}" - ) - else: - rospy.Subscriber( - ros_topic, - msg_class, - lambda msg, kafka_topic=kafka_topic: self.callback(msg, kafka_topic), - ) - rospy.loginfo( - f"Using {msg_type} from ROS: {ros_topic} -> KAFKA: {kafka_topic}" - ) + rospy.Subscriber( + ros_topic, + msg_class, + self.message_callback, + callback_args=ros_topic, + ) + rospy.loginfo(f"Subscribed to ROS topic: {ros_topic}") + + self.run() def load_parameters(self) -> None: self._filename = rospy.get_param("~topics_filename", "topics.yaml") @@ -95,7 +89,7 @@ def load_parameters(self) -> None: self._update_rate = float(rospy.get_param("~update_rate", "10.0")) self._robot_name = rospy.get_param("~robot_name", "UGV") - def create_kafka_topics(self, topics_dict: dict) -> None: + def create_kafka_topics(self, topics_dict: Dict[str, Dict[str, str]]) -> None: """ creates kafka topics based on config @@ -129,46 +123,44 @@ def create_kafka_topics(self, topics_dict: dict) -> None: else: rospy.logerr("All kafka topics already exist.") - def image_callback(self, msg, kafka_topic: str) -> None: - try: - # convert ros image to compressed jpeg and base64 encode - cv_image = self.bridge.imgmsg_to_cv2( - msg, desired_encoding="passthrough" - ) - _, buffer = cv2.imencode( - ".jpg", cv_image, [cv2.IMWRITE_JPEG_QUALITY, 50] - ) - base64_image = base64.b64encode(buffer).decode("utf-8") - - # Create a json message - json_message = {"topic": kafka_topic, "image_data": base64_image} - rospy.loginfo(f"Encoded image to Base64 for topic {kafka_topic}") - self.producer.send(kafka_topic, json_message) - except Exception as err: - rospy.logerr( - f"Failed to process image message for topic {kafka_topic}: {err}" - ) - - def callback(self, msg, kafka_topic: str) -> None: + def message_callback(self, msg: Any, ros_topic: str) -> None: """ - takes msg from ros, converts to json, publishes to kafka - - :param msg: ros msg from subscriber - :param kafka_topic (str): kafka topic name - + stores latest ros msg """ - rospy.loginfo(f"Received message on ROS topic -> {kafka_topic}") + self.latest_msgs[ros_topic] = msg - json_str = json_message_converter.convert_ros_message_to_json(msg) + def publish_to_kafka(self) -> None: + """ + publish the latest messages to their respective Kafka topics. + """ + for msg_type, details in self.topics_dict.items(): + ros_topic = details["ros_topic"] + kafka_topic = details["kafka_topic"] + msg = self.latest_msgs[ros_topic] + + if msg is None: + continue # skip if no message has been received yet + + try: + # convert messages to JSON + json_message = ( + json_message_converter.convert_ros_message_to_json(msg) + ) - self.producer.send(kafka_topic, json_str) + self.producer.send(kafka_topic, json_message) + rospy.loginfo(f"Published to Kafka topic: {kafka_topic}") + except Exception as e: + rospy.logerr( + f"Failed to publish message from {ros_topic} to {kafka_topic}: {e}" + ) - def run(self): + def run(self) -> None: rate = rospy.Rate(self._update_rate) while not rospy.is_shutdown(): + self.publish_to_kafka() rate.sleep() - def shutdown(self): + def shutdown(self) -> None: rospy.loginfo("Shutting down") diff --git a/src/utils.py b/src/utils.py index 5ee5cba..34140a5 100755 --- a/src/utils.py +++ b/src/utils.py @@ -13,7 +13,7 @@ def load_yaml_to_dict(yaml_file: str, robot_name: str = "UGV") -> dict: takes yaml file and converts to dict which includes topic names & msg types :param yaml_file (str): path to yaml file - :param robot_name(str, optional): first line of yaml file, default -> 'UGV' + :param robot_name(str, optional): first line of yaml file, default -> 'UGV' :return topic_dict (dict): dictionary mapping topics & msg types example: topic_list.yaml @@ -51,7 +51,7 @@ def load_yaml_to_dict(yaml_file: str, robot_name: str = "UGV") -> dict: def import_msg_type(msg_type: str): """ takes a ros msg_type and dynamically imports the msg type and returns it - + :params msg_type (str): the string identifier for the ROS msg type :return subscriber_type (class): the corresponding ROS msg class :raises ValueError: if msg_type is not found @@ -140,6 +140,14 @@ def import_msg_type(msg_type: str): from tf2_msgs.msg import TFMessage subscriber_msg = TFMessage + elif msg_type == "robotnik_msgs/BatteryStatus": + from robotnik_msgs.msg import BatteryStatus + + subscriber_msg = BatteryStatus + elif msg_type == "robotnik_msgs/State": + from robotnik_msgs.msg import State + + subscriber_msg = State else: raise ValueError( f'MSG "{msg_type}" IS NOT SUPPORTED \nPlease add imports to utils.py for specific msg type.'