From 91efdd793a1f283381c9b80ccd6ac10711cf9463 Mon Sep 17 00:00:00 2001 From: Lucy Harris Date: Wed, 15 Jan 2025 18:37:51 +0100 Subject: [PATCH] image pub now srv, adding defined kafka defs for IMS --- CMakeLists.txt | 11 ++- config/heron_pilot.yaml | 19 ++++++ config/pothole_test.yaml | 7 ++ config/topics.yaml | 21 +++--- launch/kafka_create_topics.launch | 19 ++++++ package.xml | 20 ++++-- scripts/image_consumer.py | 2 +- src/kafka_create_topics.py | 107 +++++++++++++++++++++++++++++ src/kafka_image_publisher.py | 109 ++++++++++++++++++++++++++++++ src/kafka_publisher.py | 99 +++++++++++++-------------- src/utils.py | 8 +++ 11 files changed, 351 insertions(+), 71 deletions(-) create mode 100644 config/heron_pilot.yaml create mode 100644 config/pothole_test.yaml create mode 100644 launch/kafka_create_topics.launch create mode 100755 src/kafka_create_topics.py create mode 100755 src/kafka_image_publisher.py diff --git a/CMakeLists.txt b/CMakeLists.txt index ce67495..48d4aa3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,4 +3,13 @@ project(ros_kafka_connector) find_package(catkin REQUIRED) catkin_python_setup() -catkin_package() +catkin_package( + CATKIN_DEPENDS + rospy + std_msgs + geometry_msgs + sensor_msgs + tf2_msgs + heron_msgs + robotnik_msgs +) diff --git a/config/heron_pilot.yaml b/config/heron_pilot.yaml new file mode 100644 index 0000000..bbc60e0 --- /dev/null +++ b/config/heron_pilot.yaml @@ -0,0 +1,19 @@ +UGV: + - msg_type: "sensor_msgs/Image" + ros_topic: "/robot/arm_camera/front_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.arm-camera.rgb.image-raw" + - msg_type: "sensor_msgs/Image" + ros_topic: "/robot/body_camera/front_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.body-camera.rgb.image-raw" + - msg_type: "sensor_msgs/NavSatFix" + ros_topic: "/robot/gps/fix" + kafka_topic: "ugv.gps-fix" + - msg_type: "robotnik_msgs/BatteryStatus" + ros_topic: "/robot/battery_estimator/data" + kafka_topic: "ugv.battery-status" + - msg_type: "robotnik_msgs/State" + ros_topic: "/robot/robot_local_control/RobotStatusComponent/state" + kafka_topic: "ugv.status" + - msg_type: "std_msgs/String" + ros_topic: "/hlp/state" + kafka_topic: "ugv.hlp.status" diff --git a/config/pothole_test.yaml b/config/pothole_test.yaml new file mode 100644 index 0000000..6ffd27f --- /dev/null +++ b/config/pothole_test.yaml @@ -0,0 +1,7 @@ +UGV: + - msg_type: "sensor_msgs/Image" + ros_topic: "/wrist_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.image.arm" + - msg_type: "sensor_msgs/Image" + ros_topic: "/front_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.image.body" diff --git a/config/topics.yaml b/config/topics.yaml index c94e646..85caa0a 100644 --- a/config/topics.yaml +++ b/config/topics.yaml @@ -1,10 +1,13 @@ UGV: - - msg_type: "sensor_msgs/Image" - ros_topic: "/wrist_camera/color/image_raw" - kafka_topic: "wrist_camera" - - msg_type: "tf2_msgs/TFMessage" - ros_topic: "/tf" - kafka_topic: "frames" - - msg_type: "sensor_msgs/JointState" - ros_topic: "/joint_state" - kafka_topic: "arm_joints" \ No newline at end of file + - msg_type: "robotnik_msgs/BatteryStatus" + ros_topic: "/robot/battery_estimator/data" + kafka_topic: "ugv.battery.status" + - msg_type: "sensor_msgs/NavSatFix" + ros_topic: "/robot/gps/fix" + kafka_topic: "ugv.gps.fix" + - msg_type: "robotnik_msgs/State" + ros_topic: "/robot/robot_local_control/RobotStatusComponent/state" + kafka_topic: "ugv.state" + - msg_type: "std_msgs/String" + ros_topic: "/hlp/state" + kafka_topic: "hlp.state" diff --git a/launch/kafka_create_topics.launch b/launch/kafka_create_topics.launch new file mode 100644 index 0000000..ddd4d9d --- /dev/null +++ b/launch/kafka_create_topics.launch @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + + + + + diff --git a/package.xml b/package.xml index 82121a3..b79e3b4 100644 --- a/package.xml +++ b/package.xml @@ -9,13 +9,19 @@ MIT catkin - roscpp - rospy - std_msgs - geometry_msgs - sensor_msgs - nav_msgs - tf2_msgs + + rospy + std_msgs + geometry_msgs + sensor_msgs + nav_msgs + tf2_msgs + + + heron_msgs + robotnik_msgs + heron_msgs + robotnik_msgs rospy_message_converter diff --git a/scripts/image_consumer.py b/scripts/image_consumer.py index 6fa7dc1..b5b153c 100755 --- a/scripts/image_consumer.py +++ b/scripts/image_consumer.py @@ -37,7 +37,7 @@ def consume_images(kafka_topic: str, kafka_server: str): parser = argparse.ArgumentParser() - parser.add_argument("-t", "--topic", default="ugv.image.body", help="Kafka topic name") + parser.add_argument("-t", "--topic", default="test.image.stream", help="Kafka topic name") parser.add_argument("-s", "--server", default="10.2.0.8:9092", help="Kafka bootstrap server") args = parser.parse_args() diff --git a/src/kafka_create_topics.py b/src/kafka_create_topics.py new file mode 100755 index 0000000..725ec50 --- /dev/null +++ b/src/kafka_create_topics.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python + +import json +import base64 +import cv2 +from cv_bridge import CvBridge +from kafka import KafkaProducer, KafkaConsumer +from confluent_kafka.admin import AdminClient, NewTopic +import rospy +import rospkg +from rospy_message_converter import json_message_converter +from sensor_msgs.msg import Image +import utils + + +class KafkaCreateTopics: + """ + takes a yaml file with: + - ros msg types + - ros topic names + - kafka topic names + + subscribes to the ros topic, and converts to json format + publishes json msg to the kafka topic + + """ + + def __init__(self): + + rospy.init_node("kafka_topic_creator") + + self.load_parameters() + pkg = rospkg.RosPack() + yaml_file = ( + pkg.get_path("ros_kafka_connector") + "/config/" + self._filename + ) + self.bridge = CvBridge() + + topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) + + # initialise admin client to create topics + self.admin_client = AdminClient( + {"bootstrap.servers": self._bootstrap_server} + ) + + try: + self.admin_client.list_topics(timeout=5) + rospy.loginfo("Kafka connection successful.") + except Exception as err: + rospy.logerr(f"Failed to connect to Kafka: {err}") + rospy.signal_shutdown("Kafka connection failed.") + + self.create_kafka_topics(topics_dict) + + def load_parameters(self) -> None: + self._filename = rospy.get_param("~topics_filename", "topics.yaml") + self._bootstrap_server = rospy.get_param( + "~bootstrap_server", "10.2.0.8:9092" + ) + self._security_protocol = rospy.get_param( + "~security_protocol", "PLAINTEXT" + ) + self._update_rate = float(rospy.get_param("~update_rate", "10.0")) + self._robot_name = rospy.get_param("~robot_name", "UGV") + + def create_kafka_topics(self, topics_dict: dict) -> None: + """ + creates kafka topics based on config + + :param topics_dict (dict): dictionary of kafka & ros topics + """ + kafka_topics = [ + topics["kafka_topic"] for topics in topics_dict.values() + ] + + # check topic doesn't already exist + existing_topics = self.admin_client.list_topics().topics.keys() + new_topics = [ + NewTopic(topic, num_partitions=1, replication_factor=1) + for topic in kafka_topics + if topic not in existing_topics + ] + + if new_topics: + rospy.loginfo( + f"Creating kafka topic {[t.topic for t in new_topics]}" + ) + futures = self.admin_client.create_topics(new_topics) + + for topic, future in futures.items(): + try: + future.result() # wait for op to finish + rospy.loginfo(f"Kafka topic '{topic}' created sucessfully!") + except Exception as err: + rospy.logerr(f"Failed to create topic '{topic}' : {err}") + + else: + rospy.logerr("All kafka topics already exist.") + +if __name__ == "__main__": + + try: + node = KafkaCreateTopics() + except rospy.ROSInterruptException: + pass + + rospy.loginfo("Exiting") diff --git a/src/kafka_image_publisher.py b/src/kafka_image_publisher.py new file mode 100755 index 0000000..3f3e5a3 --- /dev/null +++ b/src/kafka_image_publisher.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python + +import json +import base64 +import cv2 +from cv_bridge import CvBridge +from kafka import KafkaProducer, KafkaConsumer +from confluent_kafka.admin import AdminClient, NewTopic +import rospy +import rospkg +from rospy_message_converter import json_message_converter +from sensor_msgs.msg import Image +from std_srvs.srv import Empty + +from heron_msgs.srv import ( + SendImageToKafka, + SendImageToKafkaRequest, + SendImageToKafkaResponse, +) + +class KafkaImagePublisher: + + def __init__(self): + + rospy.init_node("kafka_image_publisher") + + self.load_parameters() + self.bridge = CvBridge() + + # start kafka producer + self.producer = KafkaProducer( + bootstrap_servers=self.bootstrap_server_, + security_protocol=self.security_protocol_, + value_serializer=lambda m: json.dumps(m).encode("ascii"), + ) + + self.bridge = CvBridge() + + self.body_img_srv = rospy.Service( + "/kafka/publish_body_image", + SendImageToKafka, + self.send_body_image_cb, + ) + self.arm_img_srv = rospy.Service( + "/kafka/publish_arm_image", SendImageToKafka, self.send_arm_image_cb + ) + + rospy.spin() + + def load_parameters(self) -> None: + self.filename_ = rospy.get_param("~topics_filename", "topics.yaml") + self.bootstrap_server_ = rospy.get_param( + "~bootstrap_server", "10.2.0.8:9092" + ) + self.security_protocol_ = rospy.get_param( + "~security_protocol", "PLAINTEXT" + ) + self.update_rate_ = float(rospy.get_param("~update_rate", "10.0")) + self.rate_ = rospy.Rate(self.update_rate_) + self.robot_name_ = rospy.get_param("~robot_name", "UGV") + self.body_ros_topic_ = "/front_rgbd_camera/rgb/image_raw" + self.arm_ros_topic_ = "/wrist_rgbd_camera/rgb/image_raw" + self.body_kafka_topic_ = "ugv.image.body" + self.arm_kafka_topic_ = "ugv.image.arm" + + def send_body_image_cb( + self, req: SendImageToKafkaRequest + ) -> SendImageToKafkaResponse: + + return self.send_to_kafka( + self.body_kafka_topic_, req.message, req.image + ) + + def send_arm_image_cb( + self, req: SendImageToKafkaRequest + ) -> SendImageToKafkaResponse: + + return self.send_to_kafka(self.arm_kafka_topic_, req.message, req.image) + + def send_to_kafka(self, kafka_topic, msg, img) -> SendImageToKafkaResponse: + try: + cv_image = self.bridge.imgmsg_to_cv2( + img, desired_encoding="passthrough" + ) + _, buffer = cv2.imencode( + ".jpg", cv_image, [cv2.IMWRITE_JPEG_QUALITY, 50] + ) + base64_image = base64.b64encode(buffer).decode("utf-8") + + # Create a json message + json_message = {"message": msg, "image_data": base64_image} + rospy.loginfo(f"Encoded image to Base64 for topic {kafka_topic}") + self.producer.send(kafka_topic, json_message) + return SendImageToKafkaResponse(success=True) + except Exception as err: + rospy.logerr( + f"Failed to process image message for topic {kafka_topic}: {err}" + ) + return SendImageToKafkaResponse(success=False) + + +if __name__ == "__main__": + + try: + node = KafkaImagePublisher() + except rospy.ROSInterruptException: + pass + + rospy.loginfo("Exiting") diff --git a/src/kafka_publisher.py b/src/kafka_publisher.py index 7a34268..aafa03f 100755 --- a/src/kafka_publisher.py +++ b/src/kafka_publisher.py @@ -37,7 +37,7 @@ def __init__(self): ) self.bridge = CvBridge() - topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) + self.topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) # initialise admin client to create topics self.admin_client = AdminClient( @@ -51,7 +51,7 @@ def __init__(self): rospy.logerr(f"Failed to connect to Kafka: {err}") rospy.signal_shutdown("Kafka connection failed.") - self.create_kafka_topics(topics_dict) + self.create_kafka_topics(self.topics_dict) # start kafka producer self.producer = KafkaProducer( @@ -60,28 +60,24 @@ def __init__(self): value_serializer=lambda m: json.dumps(m).encode("ascii"), ) - for msg_type, topics in topics_dict.items(): - ros_topic = topics["ros_topic"] - kafka_topic = topics["kafka_topic"] + # create topic storage for the latest messages + self.latest_msgs = { + details["ros_topic"]: None for details in self.topics_dict.values() + } + + # Subscribers for all topics + for msg_type, details in self.topics_dict.items(): + ros_topic = details["ros_topic"] msg_class = utils.import_msg_type(msg_type) - if msg_class == Image: - rospy.Subscriber( - ros_topic, - msg_class, - lambda msg, kafka_topic=kafka_topic: self.image_callback(msg, kafka_topic), - ) - rospy.loginfo( - f"Subscribing to Image topic: {ros_topic} -> KAFKA: {kafka_topic}" - ) - else: - rospy.Subscriber( - ros_topic, - msg_class, - lambda msg, kafka_topic=kafka_topic: self.callback(msg, kafka_topic), - ) - rospy.loginfo( - f"Using {msg_type} from ROS: {ros_topic} -> KAFKA: {kafka_topic}" - ) + rospy.Subscriber( + ros_topic, + msg_class, + self.message_callback, + callback_args=ros_topic, + ) + rospy.loginfo(f"Subscribed to ROS topic: {ros_topic}") + + self.run() def load_parameters(self) -> None: self._filename = rospy.get_param("~topics_filename", "topics.yaml") @@ -128,44 +124,41 @@ def create_kafka_topics(self, topics_dict: dict) -> None: else: rospy.logerr("All kafka topics already exist.") - def image_callback(self, msg, kafka_topic: str) -> None: - ### also log to Robotnik - try: - # convert ros image to compressed jpeg and base64 encode - cv_image = self.bridge.imgmsg_to_cv2( - msg, desired_encoding="passthrough" - ) - _, buffer = cv2.imencode( - ".jpg", cv_image, [cv2.IMWRITE_JPEG_QUALITY, 50] - ) - base64_image = base64.b64encode(buffer).decode("utf-8") - - # Create a json message - json_message = {"topic": kafka_topic, "image_data": base64_image} - rospy.loginfo(f"Encoded image to Base64 for topic {kafka_topic}") - self.producer.send(kafka_topic, json_message) - except Exception as err: - rospy.logerr( - f"Failed to process image message for topic {kafka_topic}: {err}" - ) - - def callback(self, msg, kafka_topic: str) -> None: + def message_callback(self, msg, ros_topic): """ - takes msg from ros, converts to json, publishes to kafka - - :param msg: ros msg from subscriber - :param kafka_topic (str): kafka topic name - + stores latest ros msg """ - rospy.loginfo(f"Received message on ROS topic -> {kafka_topic}") + self.latest_msgs[ros_topic] = msg - json_str = json_message_converter.convert_ros_message_to_json(msg) + def publish_to_kafka(self): + """ + publish the latest messages to their respective Kafka topics. + """ + for msg_type, details in self.topics_dict.items(): + ros_topic = details["ros_topic"] + kafka_topic = details["kafka_topic"] + msg = self.latest_msgs[ros_topic] + + if msg is None: + continue # Skip if no message has been received yet + + try: + # Convert other messages to JSON + json_message = ( + json_message_converter.convert_ros_message_to_json(msg) + ) - self.producer.send(kafka_topic, json_str) + self.producer.send(kafka_topic, json_message) + rospy.loginfo(f"Published to Kafka topic: {kafka_topic}") + except Exception as e: + rospy.logerr( + f"Failed to publish message from {ros_topic} to {kafka_topic}: {e}" + ) def run(self): rate = rospy.Rate(self._update_rate) while not rospy.is_shutdown(): + self.publish_to_kafka() rate.sleep() def shutdown(self): diff --git a/src/utils.py b/src/utils.py index 5ee5cba..05ab14f 100755 --- a/src/utils.py +++ b/src/utils.py @@ -140,6 +140,14 @@ def import_msg_type(msg_type: str): from tf2_msgs.msg import TFMessage subscriber_msg = TFMessage + elif msg_type == "robotnik_msgs/BatteryStatus": + from robotnik_msgs.msg import BatteryStatus + + subscriber_msg = BatteryStatus + elif msg_type == "robotnik_msgs/State": + from robotnik_msgs.msg import State + + subscriber_msg = State else: raise ValueError( f'MSG "{msg_type}" IS NOT SUPPORTED \nPlease add imports to utils.py for specific msg type.'