From 924ee7b12774597d196b741382421aebfe472791 Mon Sep 17 00:00:00 2001 From: Lucy Harris Date: Wed, 11 Dec 2024 17:31:10 +0100 Subject: [PATCH 1/3] can publish image from srv --- scripts/image_consumer.py | 2 +- src/kafka_publisher.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) mode change 100644 => 100755 scripts/image_consumer.py diff --git a/scripts/image_consumer.py b/scripts/image_consumer.py old mode 100644 new mode 100755 index ddfbd6b..6fa7dc1 --- a/scripts/image_consumer.py +++ b/scripts/image_consumer.py @@ -37,7 +37,7 @@ def consume_images(kafka_topic: str, kafka_server: str): parser = argparse.ArgumentParser() - parser.add_argument("-t", "--topic", default="heron.sim-test.wrist-camera", help="Kafka topic name") + parser.add_argument("-t", "--topic", default="ugv.image.body", help="Kafka topic name") parser.add_argument("-s", "--server", default="10.2.0.8:9092", help="Kafka bootstrap server") args = parser.parse_args() diff --git a/src/kafka_publisher.py b/src/kafka_publisher.py index 7980c65..7a34268 100755 --- a/src/kafka_publisher.py +++ b/src/kafka_publisher.py @@ -14,7 +14,6 @@ class KafkaPublisher: - # TODO need to make kafka topics!! """ takes a yaml file with: - ros msg types @@ -130,6 +129,7 @@ def create_kafka_topics(self, topics_dict: dict) -> None: rospy.logerr("All kafka topics already exist.") def image_callback(self, msg, kafka_topic: str) -> None: + ### also log to Robotnik try: # convert ros image to compressed jpeg and base64 encode cv_image = self.bridge.imgmsg_to_cv2( From 91efdd793a1f283381c9b80ccd6ac10711cf9463 Mon Sep 17 00:00:00 2001 From: Lucy Harris Date: Wed, 15 Jan 2025 18:37:51 +0100 Subject: [PATCH 2/3] image pub now srv, adding defined kafka defs for IMS --- CMakeLists.txt | 11 ++- config/heron_pilot.yaml | 19 ++++++ config/pothole_test.yaml | 7 ++ config/topics.yaml | 21 +++--- launch/kafka_create_topics.launch | 19 ++++++ package.xml | 20 ++++-- scripts/image_consumer.py | 2 +- src/kafka_create_topics.py | 107 +++++++++++++++++++++++++++++ src/kafka_image_publisher.py | 109 ++++++++++++++++++++++++++++++ src/kafka_publisher.py | 99 +++++++++++++-------------- src/utils.py | 8 +++ 11 files changed, 351 insertions(+), 71 deletions(-) create mode 100644 config/heron_pilot.yaml create mode 100644 config/pothole_test.yaml create mode 100644 launch/kafka_create_topics.launch create mode 100755 src/kafka_create_topics.py create mode 100755 src/kafka_image_publisher.py diff --git a/CMakeLists.txt b/CMakeLists.txt index ce67495..48d4aa3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,4 +3,13 @@ project(ros_kafka_connector) find_package(catkin REQUIRED) catkin_python_setup() -catkin_package() +catkin_package( + CATKIN_DEPENDS + rospy + std_msgs + geometry_msgs + sensor_msgs + tf2_msgs + heron_msgs + robotnik_msgs +) diff --git a/config/heron_pilot.yaml b/config/heron_pilot.yaml new file mode 100644 index 0000000..bbc60e0 --- /dev/null +++ b/config/heron_pilot.yaml @@ -0,0 +1,19 @@ +UGV: + - msg_type: "sensor_msgs/Image" + ros_topic: "/robot/arm_camera/front_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.arm-camera.rgb.image-raw" + - msg_type: "sensor_msgs/Image" + ros_topic: "/robot/body_camera/front_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.body-camera.rgb.image-raw" + - msg_type: "sensor_msgs/NavSatFix" + ros_topic: "/robot/gps/fix" + kafka_topic: "ugv.gps-fix" + - msg_type: "robotnik_msgs/BatteryStatus" + ros_topic: "/robot/battery_estimator/data" + kafka_topic: "ugv.battery-status" + - msg_type: "robotnik_msgs/State" + ros_topic: "/robot/robot_local_control/RobotStatusComponent/state" + kafka_topic: "ugv.status" + - msg_type: "std_msgs/String" + ros_topic: "/hlp/state" + kafka_topic: "ugv.hlp.status" diff --git a/config/pothole_test.yaml b/config/pothole_test.yaml new file mode 100644 index 0000000..6ffd27f --- /dev/null +++ b/config/pothole_test.yaml @@ -0,0 +1,7 @@ +UGV: + - msg_type: "sensor_msgs/Image" + ros_topic: "/wrist_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.image.arm" + - msg_type: "sensor_msgs/Image" + ros_topic: "/front_rgbd_camera/rgb/image_raw" + kafka_topic: "ugv.image.body" diff --git a/config/topics.yaml b/config/topics.yaml index c94e646..85caa0a 100644 --- a/config/topics.yaml +++ b/config/topics.yaml @@ -1,10 +1,13 @@ UGV: - - msg_type: "sensor_msgs/Image" - ros_topic: "/wrist_camera/color/image_raw" - kafka_topic: "wrist_camera" - - msg_type: "tf2_msgs/TFMessage" - ros_topic: "/tf" - kafka_topic: "frames" - - msg_type: "sensor_msgs/JointState" - ros_topic: "/joint_state" - kafka_topic: "arm_joints" \ No newline at end of file + - msg_type: "robotnik_msgs/BatteryStatus" + ros_topic: "/robot/battery_estimator/data" + kafka_topic: "ugv.battery.status" + - msg_type: "sensor_msgs/NavSatFix" + ros_topic: "/robot/gps/fix" + kafka_topic: "ugv.gps.fix" + - msg_type: "robotnik_msgs/State" + ros_topic: "/robot/robot_local_control/RobotStatusComponent/state" + kafka_topic: "ugv.state" + - msg_type: "std_msgs/String" + ros_topic: "/hlp/state" + kafka_topic: "hlp.state" diff --git a/launch/kafka_create_topics.launch b/launch/kafka_create_topics.launch new file mode 100644 index 0000000..ddd4d9d --- /dev/null +++ b/launch/kafka_create_topics.launch @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + + + + + diff --git a/package.xml b/package.xml index 82121a3..b79e3b4 100644 --- a/package.xml +++ b/package.xml @@ -9,13 +9,19 @@ MIT catkin - roscpp - rospy - std_msgs - geometry_msgs - sensor_msgs - nav_msgs - tf2_msgs + + rospy + std_msgs + geometry_msgs + sensor_msgs + nav_msgs + tf2_msgs + + + heron_msgs + robotnik_msgs + heron_msgs + robotnik_msgs rospy_message_converter diff --git a/scripts/image_consumer.py b/scripts/image_consumer.py index 6fa7dc1..b5b153c 100755 --- a/scripts/image_consumer.py +++ b/scripts/image_consumer.py @@ -37,7 +37,7 @@ def consume_images(kafka_topic: str, kafka_server: str): parser = argparse.ArgumentParser() - parser.add_argument("-t", "--topic", default="ugv.image.body", help="Kafka topic name") + parser.add_argument("-t", "--topic", default="test.image.stream", help="Kafka topic name") parser.add_argument("-s", "--server", default="10.2.0.8:9092", help="Kafka bootstrap server") args = parser.parse_args() diff --git a/src/kafka_create_topics.py b/src/kafka_create_topics.py new file mode 100755 index 0000000..725ec50 --- /dev/null +++ b/src/kafka_create_topics.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python + +import json +import base64 +import cv2 +from cv_bridge import CvBridge +from kafka import KafkaProducer, KafkaConsumer +from confluent_kafka.admin import AdminClient, NewTopic +import rospy +import rospkg +from rospy_message_converter import json_message_converter +from sensor_msgs.msg import Image +import utils + + +class KafkaCreateTopics: + """ + takes a yaml file with: + - ros msg types + - ros topic names + - kafka topic names + + subscribes to the ros topic, and converts to json format + publishes json msg to the kafka topic + + """ + + def __init__(self): + + rospy.init_node("kafka_topic_creator") + + self.load_parameters() + pkg = rospkg.RosPack() + yaml_file = ( + pkg.get_path("ros_kafka_connector") + "/config/" + self._filename + ) + self.bridge = CvBridge() + + topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) + + # initialise admin client to create topics + self.admin_client = AdminClient( + {"bootstrap.servers": self._bootstrap_server} + ) + + try: + self.admin_client.list_topics(timeout=5) + rospy.loginfo("Kafka connection successful.") + except Exception as err: + rospy.logerr(f"Failed to connect to Kafka: {err}") + rospy.signal_shutdown("Kafka connection failed.") + + self.create_kafka_topics(topics_dict) + + def load_parameters(self) -> None: + self._filename = rospy.get_param("~topics_filename", "topics.yaml") + self._bootstrap_server = rospy.get_param( + "~bootstrap_server", "10.2.0.8:9092" + ) + self._security_protocol = rospy.get_param( + "~security_protocol", "PLAINTEXT" + ) + self._update_rate = float(rospy.get_param("~update_rate", "10.0")) + self._robot_name = rospy.get_param("~robot_name", "UGV") + + def create_kafka_topics(self, topics_dict: dict) -> None: + """ + creates kafka topics based on config + + :param topics_dict (dict): dictionary of kafka & ros topics + """ + kafka_topics = [ + topics["kafka_topic"] for topics in topics_dict.values() + ] + + # check topic doesn't already exist + existing_topics = self.admin_client.list_topics().topics.keys() + new_topics = [ + NewTopic(topic, num_partitions=1, replication_factor=1) + for topic in kafka_topics + if topic not in existing_topics + ] + + if new_topics: + rospy.loginfo( + f"Creating kafka topic {[t.topic for t in new_topics]}" + ) + futures = self.admin_client.create_topics(new_topics) + + for topic, future in futures.items(): + try: + future.result() # wait for op to finish + rospy.loginfo(f"Kafka topic '{topic}' created sucessfully!") + except Exception as err: + rospy.logerr(f"Failed to create topic '{topic}' : {err}") + + else: + rospy.logerr("All kafka topics already exist.") + +if __name__ == "__main__": + + try: + node = KafkaCreateTopics() + except rospy.ROSInterruptException: + pass + + rospy.loginfo("Exiting") diff --git a/src/kafka_image_publisher.py b/src/kafka_image_publisher.py new file mode 100755 index 0000000..3f3e5a3 --- /dev/null +++ b/src/kafka_image_publisher.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python + +import json +import base64 +import cv2 +from cv_bridge import CvBridge +from kafka import KafkaProducer, KafkaConsumer +from confluent_kafka.admin import AdminClient, NewTopic +import rospy +import rospkg +from rospy_message_converter import json_message_converter +from sensor_msgs.msg import Image +from std_srvs.srv import Empty + +from heron_msgs.srv import ( + SendImageToKafka, + SendImageToKafkaRequest, + SendImageToKafkaResponse, +) + +class KafkaImagePublisher: + + def __init__(self): + + rospy.init_node("kafka_image_publisher") + + self.load_parameters() + self.bridge = CvBridge() + + # start kafka producer + self.producer = KafkaProducer( + bootstrap_servers=self.bootstrap_server_, + security_protocol=self.security_protocol_, + value_serializer=lambda m: json.dumps(m).encode("ascii"), + ) + + self.bridge = CvBridge() + + self.body_img_srv = rospy.Service( + "/kafka/publish_body_image", + SendImageToKafka, + self.send_body_image_cb, + ) + self.arm_img_srv = rospy.Service( + "/kafka/publish_arm_image", SendImageToKafka, self.send_arm_image_cb + ) + + rospy.spin() + + def load_parameters(self) -> None: + self.filename_ = rospy.get_param("~topics_filename", "topics.yaml") + self.bootstrap_server_ = rospy.get_param( + "~bootstrap_server", "10.2.0.8:9092" + ) + self.security_protocol_ = rospy.get_param( + "~security_protocol", "PLAINTEXT" + ) + self.update_rate_ = float(rospy.get_param("~update_rate", "10.0")) + self.rate_ = rospy.Rate(self.update_rate_) + self.robot_name_ = rospy.get_param("~robot_name", "UGV") + self.body_ros_topic_ = "/front_rgbd_camera/rgb/image_raw" + self.arm_ros_topic_ = "/wrist_rgbd_camera/rgb/image_raw" + self.body_kafka_topic_ = "ugv.image.body" + self.arm_kafka_topic_ = "ugv.image.arm" + + def send_body_image_cb( + self, req: SendImageToKafkaRequest + ) -> SendImageToKafkaResponse: + + return self.send_to_kafka( + self.body_kafka_topic_, req.message, req.image + ) + + def send_arm_image_cb( + self, req: SendImageToKafkaRequest + ) -> SendImageToKafkaResponse: + + return self.send_to_kafka(self.arm_kafka_topic_, req.message, req.image) + + def send_to_kafka(self, kafka_topic, msg, img) -> SendImageToKafkaResponse: + try: + cv_image = self.bridge.imgmsg_to_cv2( + img, desired_encoding="passthrough" + ) + _, buffer = cv2.imencode( + ".jpg", cv_image, [cv2.IMWRITE_JPEG_QUALITY, 50] + ) + base64_image = base64.b64encode(buffer).decode("utf-8") + + # Create a json message + json_message = {"message": msg, "image_data": base64_image} + rospy.loginfo(f"Encoded image to Base64 for topic {kafka_topic}") + self.producer.send(kafka_topic, json_message) + return SendImageToKafkaResponse(success=True) + except Exception as err: + rospy.logerr( + f"Failed to process image message for topic {kafka_topic}: {err}" + ) + return SendImageToKafkaResponse(success=False) + + +if __name__ == "__main__": + + try: + node = KafkaImagePublisher() + except rospy.ROSInterruptException: + pass + + rospy.loginfo("Exiting") diff --git a/src/kafka_publisher.py b/src/kafka_publisher.py index 7a34268..aafa03f 100755 --- a/src/kafka_publisher.py +++ b/src/kafka_publisher.py @@ -37,7 +37,7 @@ def __init__(self): ) self.bridge = CvBridge() - topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) + self.topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) # initialise admin client to create topics self.admin_client = AdminClient( @@ -51,7 +51,7 @@ def __init__(self): rospy.logerr(f"Failed to connect to Kafka: {err}") rospy.signal_shutdown("Kafka connection failed.") - self.create_kafka_topics(topics_dict) + self.create_kafka_topics(self.topics_dict) # start kafka producer self.producer = KafkaProducer( @@ -60,28 +60,24 @@ def __init__(self): value_serializer=lambda m: json.dumps(m).encode("ascii"), ) - for msg_type, topics in topics_dict.items(): - ros_topic = topics["ros_topic"] - kafka_topic = topics["kafka_topic"] + # create topic storage for the latest messages + self.latest_msgs = { + details["ros_topic"]: None for details in self.topics_dict.values() + } + + # Subscribers for all topics + for msg_type, details in self.topics_dict.items(): + ros_topic = details["ros_topic"] msg_class = utils.import_msg_type(msg_type) - if msg_class == Image: - rospy.Subscriber( - ros_topic, - msg_class, - lambda msg, kafka_topic=kafka_topic: self.image_callback(msg, kafka_topic), - ) - rospy.loginfo( - f"Subscribing to Image topic: {ros_topic} -> KAFKA: {kafka_topic}" - ) - else: - rospy.Subscriber( - ros_topic, - msg_class, - lambda msg, kafka_topic=kafka_topic: self.callback(msg, kafka_topic), - ) - rospy.loginfo( - f"Using {msg_type} from ROS: {ros_topic} -> KAFKA: {kafka_topic}" - ) + rospy.Subscriber( + ros_topic, + msg_class, + self.message_callback, + callback_args=ros_topic, + ) + rospy.loginfo(f"Subscribed to ROS topic: {ros_topic}") + + self.run() def load_parameters(self) -> None: self._filename = rospy.get_param("~topics_filename", "topics.yaml") @@ -128,44 +124,41 @@ def create_kafka_topics(self, topics_dict: dict) -> None: else: rospy.logerr("All kafka topics already exist.") - def image_callback(self, msg, kafka_topic: str) -> None: - ### also log to Robotnik - try: - # convert ros image to compressed jpeg and base64 encode - cv_image = self.bridge.imgmsg_to_cv2( - msg, desired_encoding="passthrough" - ) - _, buffer = cv2.imencode( - ".jpg", cv_image, [cv2.IMWRITE_JPEG_QUALITY, 50] - ) - base64_image = base64.b64encode(buffer).decode("utf-8") - - # Create a json message - json_message = {"topic": kafka_topic, "image_data": base64_image} - rospy.loginfo(f"Encoded image to Base64 for topic {kafka_topic}") - self.producer.send(kafka_topic, json_message) - except Exception as err: - rospy.logerr( - f"Failed to process image message for topic {kafka_topic}: {err}" - ) - - def callback(self, msg, kafka_topic: str) -> None: + def message_callback(self, msg, ros_topic): """ - takes msg from ros, converts to json, publishes to kafka - - :param msg: ros msg from subscriber - :param kafka_topic (str): kafka topic name - + stores latest ros msg """ - rospy.loginfo(f"Received message on ROS topic -> {kafka_topic}") + self.latest_msgs[ros_topic] = msg - json_str = json_message_converter.convert_ros_message_to_json(msg) + def publish_to_kafka(self): + """ + publish the latest messages to their respective Kafka topics. + """ + for msg_type, details in self.topics_dict.items(): + ros_topic = details["ros_topic"] + kafka_topic = details["kafka_topic"] + msg = self.latest_msgs[ros_topic] + + if msg is None: + continue # Skip if no message has been received yet + + try: + # Convert other messages to JSON + json_message = ( + json_message_converter.convert_ros_message_to_json(msg) + ) - self.producer.send(kafka_topic, json_str) + self.producer.send(kafka_topic, json_message) + rospy.loginfo(f"Published to Kafka topic: {kafka_topic}") + except Exception as e: + rospy.logerr( + f"Failed to publish message from {ros_topic} to {kafka_topic}: {e}" + ) def run(self): rate = rospy.Rate(self._update_rate) while not rospy.is_shutdown(): + self.publish_to_kafka() rate.sleep() def shutdown(self): diff --git a/src/utils.py b/src/utils.py index 5ee5cba..05ab14f 100755 --- a/src/utils.py +++ b/src/utils.py @@ -140,6 +140,14 @@ def import_msg_type(msg_type: str): from tf2_msgs.msg import TFMessage subscriber_msg = TFMessage + elif msg_type == "robotnik_msgs/BatteryStatus": + from robotnik_msgs.msg import BatteryStatus + + subscriber_msg = BatteryStatus + elif msg_type == "robotnik_msgs/State": + from robotnik_msgs.msg import State + + subscriber_msg = State else: raise ValueError( f'MSG "{msg_type}" IS NOT SUPPORTED \nPlease add imports to utils.py for specific msg type.' From 13167516643bf34a719534b8228f771f14b042d6 Mon Sep 17 00:00:00 2001 From: Lucy Harris Date: Thu, 16 Jan 2025 11:28:55 +0100 Subject: [PATCH 3/3] formatting --- README.md | 2 ++ setup.py | 2 +- src/kafka_create_topics.py | 10 ++-------- src/kafka_image_publisher.py | 14 ++++++++++++-- src/kafka_publisher.py | 31 +++++++++++++++---------------- src/utils.py | 4 ++-- 6 files changed, 34 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 0c393b1..3269f90 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,8 @@ topic_dict: { ``` Message types that are added are in the [`utils.py`](https://github.com/ethz-asl/ros-kafka-connector/blob/master/src/utils.py) file. To add new message types to the converter, add cases to `import_msg_type` function. If you have custom messages from ROS, you need to make them callable in your ros workspace. +This package does depend on both [`robotnik_msgs`](https://github.com/RobotnikAutomation/robotnik_msgs/tree/ros-devel) and [`heron_msgs`](https://github.com/RobotnikAutomation/heron_msgs). Make sure to add these packages to your catkin ws. + | Parameter | Info | Default | | ------------- |:-------------:| -----:| | bootstrap_server | IP of kafka server | "localhost:9092" | diff --git a/setup.py b/setup.py index 4c26163..8527a83 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ # fetch values from package.xml. setup_args = generate_distutils_setup( - packages=["ros_kafka_connector"], + packages=["ros_kafka_connector"], package_dir={"": "src"} ) diff --git a/src/kafka_create_topics.py b/src/kafka_create_topics.py index 725ec50..8f8d380 100755 --- a/src/kafka_create_topics.py +++ b/src/kafka_create_topics.py @@ -1,15 +1,9 @@ #!/usr/bin/env python -import json -import base64 -import cv2 from cv_bridge import CvBridge -from kafka import KafkaProducer, KafkaConsumer from confluent_kafka.admin import AdminClient, NewTopic import rospy import rospkg -from rospy_message_converter import json_message_converter -from sensor_msgs.msg import Image import utils @@ -20,8 +14,7 @@ class KafkaCreateTopics: - ros topic names - kafka topic names - subscribes to the ros topic, and converts to json format - publishes json msg to the kafka topic + creates kafka topics """ @@ -97,6 +90,7 @@ def create_kafka_topics(self, topics_dict: dict) -> None: else: rospy.logerr("All kafka topics already exist.") + if __name__ == "__main__": try: diff --git a/src/kafka_image_publisher.py b/src/kafka_image_publisher.py index 3f3e5a3..e1889e1 100755 --- a/src/kafka_image_publisher.py +++ b/src/kafka_image_publisher.py @@ -19,7 +19,16 @@ ) class KafkaImagePublisher: + """ + sends ROS images to kafka with ROS srv call + images in kafka are in the Base64 type. + services will send an image before and after image processing + e.g. pothole detection + + hardcoded ros topics & kafka topics + kafka topics should already be created + """ def __init__(self): rospy.init_node("kafka_image_publisher") @@ -34,8 +43,9 @@ def __init__(self): value_serializer=lambda m: json.dumps(m).encode("ascii"), ) - self.bridge = CvBridge() + self.bridge = CvBridge() # start bridge to convert images + # services for before/after image processing self.body_img_srv = rospy.Service( "/kafka/publish_body_image", SendImageToKafka, @@ -77,7 +87,7 @@ def send_arm_image_cb( return self.send_to_kafka(self.arm_kafka_topic_, req.message, req.image) - def send_to_kafka(self, kafka_topic, msg, img) -> SendImageToKafkaResponse: + def send_to_kafka(self, kafka_topic: str, msg: str, img: Image) -> SendImageToKafkaResponse: try: cv_image = self.bridge.imgmsg_to_cv2( img, desired_encoding="passthrough" diff --git a/src/kafka_publisher.py b/src/kafka_publisher.py index aafa03f..73d9545 100755 --- a/src/kafka_publisher.py +++ b/src/kafka_publisher.py @@ -1,15 +1,15 @@ #!/usr/bin/env python -import json -import base64 -import cv2 -from cv_bridge import CvBridge -from kafka import KafkaProducer, KafkaConsumer -from confluent_kafka.admin import AdminClient, NewTopic import rospy import rospkg +import json + +from typing import Dict, Any +from kafka import KafkaProducer +from confluent_kafka.admin import AdminClient, NewTopic from rospy_message_converter import json_message_converter from sensor_msgs.msg import Image + import utils @@ -25,7 +25,7 @@ class KafkaPublisher: """ - def __init__(self): + def __init__(self) -> None: rospy.init_node("kafka_publisher") rospy.on_shutdown(self.shutdown) @@ -35,7 +35,6 @@ def __init__(self): yaml_file = ( pkg.get_path("ros_kafka_connector") + "/config/" + self._filename ) - self.bridge = CvBridge() self.topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) @@ -65,7 +64,7 @@ def __init__(self): details["ros_topic"]: None for details in self.topics_dict.values() } - # Subscribers for all topics + # subscribers for all topics for msg_type, details in self.topics_dict.items(): ros_topic = details["ros_topic"] msg_class = utils.import_msg_type(msg_type) @@ -90,7 +89,7 @@ def load_parameters(self) -> None: self._update_rate = float(rospy.get_param("~update_rate", "10.0")) self._robot_name = rospy.get_param("~robot_name", "UGV") - def create_kafka_topics(self, topics_dict: dict) -> None: + def create_kafka_topics(self, topics_dict: Dict[str, Dict[str, str]]) -> None: """ creates kafka topics based on config @@ -124,13 +123,13 @@ def create_kafka_topics(self, topics_dict: dict) -> None: else: rospy.logerr("All kafka topics already exist.") - def message_callback(self, msg, ros_topic): + def message_callback(self, msg: Any, ros_topic: str) -> None: """ stores latest ros msg """ self.latest_msgs[ros_topic] = msg - def publish_to_kafka(self): + def publish_to_kafka(self) -> None: """ publish the latest messages to their respective Kafka topics. """ @@ -140,10 +139,10 @@ def publish_to_kafka(self): msg = self.latest_msgs[ros_topic] if msg is None: - continue # Skip if no message has been received yet + continue # skip if no message has been received yet try: - # Convert other messages to JSON + # convert messages to JSON json_message = ( json_message_converter.convert_ros_message_to_json(msg) ) @@ -155,13 +154,13 @@ def publish_to_kafka(self): f"Failed to publish message from {ros_topic} to {kafka_topic}: {e}" ) - def run(self): + def run(self) -> None: rate = rospy.Rate(self._update_rate) while not rospy.is_shutdown(): self.publish_to_kafka() rate.sleep() - def shutdown(self): + def shutdown(self) -> None: rospy.loginfo("Shutting down") diff --git a/src/utils.py b/src/utils.py index 05ab14f..34140a5 100755 --- a/src/utils.py +++ b/src/utils.py @@ -13,7 +13,7 @@ def load_yaml_to_dict(yaml_file: str, robot_name: str = "UGV") -> dict: takes yaml file and converts to dict which includes topic names & msg types :param yaml_file (str): path to yaml file - :param robot_name(str, optional): first line of yaml file, default -> 'UGV' + :param robot_name(str, optional): first line of yaml file, default -> 'UGV' :return topic_dict (dict): dictionary mapping topics & msg types example: topic_list.yaml @@ -51,7 +51,7 @@ def load_yaml_to_dict(yaml_file: str, robot_name: str = "UGV") -> dict: def import_msg_type(msg_type: str): """ takes a ros msg_type and dynamically imports the msg type and returns it - + :params msg_type (str): the string identifier for the ROS msg type :return subscriber_type (class): the corresponding ROS msg class :raises ValueError: if msg_type is not found