From 926b1eaa6406e7b3e0dbc61de2d70d95c5e3d7f0 Mon Sep 17 00:00:00 2001 From: Lucy Harris Date: Tue, 3 Dec 2024 17:06:08 +0100 Subject: [PATCH] can publish to kafka, few bugs --- config/sim_test.yaml | 13 ++++++ launch/kafka_publisher.launch | 6 ++- launch/ros_publisher.launch | 4 +- requirements.txt | 3 +- src/kafka_publisher.py | 76 +++++++++++++++++++++++++++++------ src/ros_publisher.py | 13 ++++-- 6 files changed, 95 insertions(+), 20 deletions(-) create mode 100644 config/sim_test.yaml diff --git a/config/sim_test.yaml b/config/sim_test.yaml new file mode 100644 index 0000000..5cc1d0b --- /dev/null +++ b/config/sim_test.yaml @@ -0,0 +1,13 @@ +UGV: + - msg_type: "sensor_msgs/Image" + ros_topic: "/wrist_camera/color/image_raw" + kafka_topic: "heron.sim-test.wrist-camera" + # - msg_type: "sensor_msgs/JointState" + # ros_topic: "/joint_states" + # kafka_topic: "heron.sim-test.arm-joints" + # - msg_type: "nav_msgs/Odometry" + # ros_topic: "/odom" + # kafka_topic: "heron.sim-test.base-odom" + # - msg_type: "sensor_msgs/LaserScan" + # ros_topic: "/front/scan" + # kafka_topic: "heron.sim-test.lidar" diff --git a/launch/kafka_publisher.launch b/launch/kafka_publisher.launch index 0f88b58..8203293 100644 --- a/launch/kafka_publisher.launch +++ b/launch/kafka_publisher.launch @@ -2,13 +2,15 @@ - + + - + + diff --git a/launch/ros_publisher.launch b/launch/ros_publisher.launch index 374b487..5f56f4f 100644 --- a/launch/ros_publisher.launch +++ b/launch/ros_publisher.launch @@ -2,12 +2,14 @@ - + + + diff --git a/requirements.txt b/requirements.txt index 091375c..a228f13 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ sudo apt install ros-noetic-rospy-message-converter -pip install kafka-python pyyaml \ No newline at end of file +pip install kafka-python pyyaml +pip install confluent-kafka diff --git a/src/kafka_publisher.py b/src/kafka_publisher.py index c40f266..cb57a68 100755 --- a/src/kafka_publisher.py +++ b/src/kafka_publisher.py @@ -1,8 +1,8 @@ #!/usr/bin/env python import json -from kafka import KafkaProducer -from kafka import KafkaConsumer +from kafka import KafkaProducer, KafkaConsumer +from confluent_kafka.admin import AdminClient, NewTopic import rospy import rospkg from rospy_message_converter import json_message_converter @@ -33,13 +33,28 @@ def __init__(self): pkg.get_path("ros_kafka_connector") + "/config/" + self._filename ) - topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) + topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name) + + # initialise admin client to create topics + self.admin_client = AdminClient( + {"bootstrap.servers": self._bootstrap_server} + ) + + try: + self.admin_client.list_topics(timeout=5) + rospy.logwarn("Kafka connection successful.") + except Exception as err: + rospy.logerr(f"Failed to connect to Kafka: {err}") + rospy.signal_shutdown("Kafka connection failed.") + + self.create_kafka_topics(topics_dict) # start kafka producer - # self.producer = KafkaProducer( - # bootstrap_servers=self._bootstrap_server, - # value_serializer=lambda m: json.dumps(m).encode('ascii') - # ) + self.producer = KafkaProducer( + bootstrap_servers=self._bootstrap_server, + security_protocol=self._security_protocol, + value_serializer=lambda m: json.dumps(m).encode("ascii"), + ) for msg_type, topics in topics_dict.items(): ros_topic = topics["ros_topic"] @@ -50,28 +65,65 @@ def __init__(self): msg_class, lambda msg: self.callback(msg, kafka_topic), ) - rospy.loginfo( + rospy.logerr( f"Using {msg_type} from ROS: {ros_topic} -> KAFKA: {kafka_topic}" ) def load_parameters(self) -> None: self._filename = rospy.get_param("~topics_filename", "topics.yaml") self._bootstrap_server = rospy.get_param( - "~bootstrap_server", "localhost:9092" + "~bootstrap_server", "10.2.0.8:9092" + ) + self._security_protocol = rospy.get_param( + "~security_protocol", "PLAINTEXT" ) self._update_rate = float(rospy.get_param("~update_rate", "10.0")) self._robot_name = rospy.get_param("~robot_name", "UGV") + def create_kafka_topics(self, topics_dict: dict) -> None: + """ + creates kafka topics based on config + + :param topics_dict (dict): dictionary of kafka & ros topics + """ + kafka_topics = [topics["kafka_topic"] for topics in topics_dict.values()] + + # check topic doesn't already exist + existing_topics = self.admin_client.list_topics().topics.keys() + new_topics = [ + NewTopic(topic, num_partitions=1, replication_factor=1) + for topic in kafka_topics if topic not in existing_topics + ] + + if new_topics: + rospy.logerr(f"Creating kafka topic {[t.topic for t in new_topics]}") + futures = self.admin_client.create_topics(new_topics) + + for topic, future in futures.items(): + try: + future.result() # wait for op to finish + rospy.logerr(f"Kafka topic '{topic}' created sucessfully!") + except Exception as err: + rospy.logerr(f"Failed to create topic '{topic}' : {err}") + + else: + rospy.logerr("All kafka topics already exist.") + + def callback(self, msg, kafka_topic: str) -> None: """ takes msg from ros, converts to json, publishes to kafka - - :param msg: ros msg from subscriber + + :param msg: ros msg from subscriber :param kafka_topic (str): kafka topic name """ + rospy.logerr(f"Received message on ROS topic -> {kafka_topic}") + json_str = json_message_converter.convert_ros_message_to_json(msg) - # self.producer.send(kafka_topic, json_str) + rospy.logerr(f"Converted msgs: {json_str}...") + + self.producer.send(kafka_topic, json_str) def run(self): rate = rospy.Rate(self._update_rate) diff --git a/src/ros_publisher.py b/src/ros_publisher.py index 6eae28c..cf55ab3 100755 --- a/src/ros_publisher.py +++ b/src/ros_publisher.py @@ -32,6 +32,7 @@ def __init__(self): consumer = KafkaConsumer( kafka_topic, bootstrap_servers=self._bootstrap_server, + security_protocol=self._security_protocol, value_deserializer=lambda m: json.loads(m.decode("ascii")), auto_offset_reset="latest", consumer_timeout_ms=5000, @@ -48,10 +49,12 @@ def __init__(self): def load_parameters(self) -> None: self._filename = rospy.get_param("~topics_filename", "topics.yaml") self._bootstrap_server = rospy.get_param( - "~bootstrap_server", "localhost:9092" + "~bootstrap_server", "10.2.0.8:9092" + ) + self._security_protocol = rospy.get_param( + "~security_protocol", "PLAINTEXT" ) self._robot_name = rospy.get_param("~robot_name", "UGV") - def run(self): @@ -59,8 +62,10 @@ def run(self): for consumer, publisher in zip(self.consumers, self.publishers): for msg in consumer: json_str = json.dump(msg.value) - ros_msg = json_message_converter.convert_json_to_ros_message( - publisher.type, json_str + ros_msg = ( + json_message_converter.convert_json_to_ros_message( + publisher.type, json_str + ) ) publisher.publish(ros_msg)