From 13167516643bf34a719534b8228f771f14b042d6 Mon Sep 17 00:00:00 2001 From: Lucy Harris Date: Thu, 16 Jan 2025 11:28:55 +0100 Subject: [PATCH] formatting --- README.md | 2 ++ setup.py | 2 +- src/kafka_create_topics.py | 10 ++-------- src/kafka_image_publisher.py | 14 ++++++++++++-- src/kafka_publisher.py | 31 +++++++++++++++---------------- src/utils.py | 4 ++-- 6 files changed, 34 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 0c393b1..3269f90 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,8 @@ topic_dict: { ``` Message types that are added are in the [`utils.py`](https://github.com/ethz-asl/ros-kafka-connector/blob/master/src/utils.py) file. To add new message types to the converter, add cases to `import_msg_type` function. If you have custom messages from ROS, you need to make them callable in your ros workspace. +This package does depend on both [`robotnik_msgs`](https://github.com/RobotnikAutomation/robotnik_msgs/tree/ros-devel) and [`heron_msgs`](https://github.com/RobotnikAutomation/heron_msgs). Make sure to add these packages to your catkin ws. + | Parameter | Info | Default | | ------------- |:-------------:| -----:| | bootstrap_server | IP of kafka server | "localhost:9092" | diff --git a/setup.py b/setup.py index 4c26163..8527a83 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ # fetch values from package.xml. setup_args = generate_distutils_setup( - packages=["ros_kafka_connector"], + packages=["ros_kafka_connector"], package_dir={"": "src"} ) diff --git a/src/kafka_create_topics.py b/src/kafka_create_topics.py index 725ec50..8f8d380 100755 --- a/src/kafka_create_topics.py +++ b/src/kafka_create_topics.py @@ -1,15 +1,9 @@ #!/usr/bin/env python -import json -import base64 -import cv2 from cv_bridge import CvBridge -from kafka import KafkaProducer, KafkaConsumer from confluent_kafka.admin import AdminClient, NewTopic import rospy import rospkg -from rospy_message_converter import json_message_converter -from sensor_msgs.msg import Image import utils @@ -20,8 +14,7 @@ class KafkaCreateTopics: - ros topic names - kafka topic names - subscribes to the ros topic, and converts to json format - publishes json msg to the kafka topic + creates kafka topics """ @@ -97,6 +90,7 @@ def create_kafka_topics(self, topics_dict: dict) -> None: else: rospy.logerr("All kafka topics already exist.") + if __name__ == "__main__": try: diff --git a/src/kafka_image_publisher.py b/src/kafka_image_publisher.py index 3f3e5a3..e1889e1 100755 --- a/src/kafka_image_publisher.py +++ b/src/kafka_image_publisher.py @@ -19,7 +19,16 @@ ) class KafkaImagePublisher: + """ + sends ROS images to kafka with ROS srv call + images in kafka are in the Base64 type. + services will send an image before and after image processing + e.g. pothole detection + + hardcoded ros topics & kafka topics + kafka topics should already be created + """ def __init__(self): rospy.init_node("kafka_image_publisher") @@ -34,8 +43,9 @@ def __init__(self): value_serializer=lambda m: json.dumps(m).encode("ascii"), ) - self.bridge = CvBridge() + self.bridge = CvBridge() # start bridge to convert images + # services for before/after image processing self.body_img_srv = rospy.Service( "/kafka/publish_body_image", SendImageToKafka, @@ -77,7 +87,7 @@ def send_arm_image_cb( return self.send_to_kafka(self.arm_kafka_topic_, req.message, req.image) - def send_to_kafka(self, kafka_topic, msg, img) -> SendImageToKafkaResponse: + def send_to_kafka(self, kafka_topic: str, msg: str, img: Image) -> SendImageToKafkaResponse: try: cv_image = self.bridge.imgmsg_to_cv2( img, desired_encoding="passthrough" diff --git a/src/kafka_publisher.py b/src/kafka_publisher.py index aafa03f..73d9545 100755 --- a/src/kafka_publisher.py +++ b/src/kafka_publisher.py @@ -1,15 +1,15 @@ #!/usr/bin/env python -import json -import base64 -import cv2 -from cv_bridge import CvBridge -from kafka import KafkaProducer, KafkaConsumer -from confluent_kafka.admin import AdminClient, NewTopic import rospy import rospkg +import json + +from typing import Dict, Any +from kafka import KafkaProducer +from confluent_kafka.admin import AdminClient, NewTopic from rospy_message_converter import json_message_converter from sensor_msgs.msg import Image + import utils @@ -25,7 +25,7 @@ class KafkaPublisher: """ - def __init__(self): + def __init__(self) -> None: rospy.init_node("kafka_publisher") rospy.on_shutdown(self.shutdown) @@ -35,7 +35,6 @@ def __init__(self): yaml_file = ( pkg.get_path("ros_kafka_connector") + "/config/" + self._filename ) - self.bridge = CvBridge() self.topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) @@ -65,7 +64,7 @@ def __init__(self): details["ros_topic"]: None for details in self.topics_dict.values() } - # Subscribers for all topics + # subscribers for all topics for msg_type, details in self.topics_dict.items(): ros_topic = details["ros_topic"] msg_class = utils.import_msg_type(msg_type) @@ -90,7 +89,7 @@ def load_parameters(self) -> None: self._update_rate = float(rospy.get_param("~update_rate", "10.0")) self._robot_name = rospy.get_param("~robot_name", "UGV") - def create_kafka_topics(self, topics_dict: dict) -> None: + def create_kafka_topics(self, topics_dict: Dict[str, Dict[str, str]]) -> None: """ creates kafka topics based on config @@ -124,13 +123,13 @@ def create_kafka_topics(self, topics_dict: dict) -> None: else: rospy.logerr("All kafka topics already exist.") - def message_callback(self, msg, ros_topic): + def message_callback(self, msg: Any, ros_topic: str) -> None: """ stores latest ros msg """ self.latest_msgs[ros_topic] = msg - def publish_to_kafka(self): + def publish_to_kafka(self) -> None: """ publish the latest messages to their respective Kafka topics. """ @@ -140,10 +139,10 @@ def publish_to_kafka(self): msg = self.latest_msgs[ros_topic] if msg is None: - continue # Skip if no message has been received yet + continue # skip if no message has been received yet try: - # Convert other messages to JSON + # convert messages to JSON json_message = ( json_message_converter.convert_ros_message_to_json(msg) ) @@ -155,13 +154,13 @@ def publish_to_kafka(self): f"Failed to publish message from {ros_topic} to {kafka_topic}: {e}" ) - def run(self): + def run(self) -> None: rate = rospy.Rate(self._update_rate) while not rospy.is_shutdown(): self.publish_to_kafka() rate.sleep() - def shutdown(self): + def shutdown(self) -> None: rospy.loginfo("Shutting down") diff --git a/src/utils.py b/src/utils.py index 05ab14f..34140a5 100755 --- a/src/utils.py +++ b/src/utils.py @@ -13,7 +13,7 @@ def load_yaml_to_dict(yaml_file: str, robot_name: str = "UGV") -> dict: takes yaml file and converts to dict which includes topic names & msg types :param yaml_file (str): path to yaml file - :param robot_name(str, optional): first line of yaml file, default -> 'UGV' + :param robot_name(str, optional): first line of yaml file, default -> 'UGV' :return topic_dict (dict): dictionary mapping topics & msg types example: topic_list.yaml @@ -51,7 +51,7 @@ def load_yaml_to_dict(yaml_file: str, robot_name: str = "UGV") -> dict: def import_msg_type(msg_type: str): """ takes a ros msg_type and dynamically imports the msg type and returns it - + :params msg_type (str): the string identifier for the ROS msg type :return subscriber_type (class): the corresponding ROS msg class :raises ValueError: if msg_type is not found