Skip to content

Commit

Permalink
can publish to kafka, few bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
luceharris committed Dec 3, 2024
1 parent 14fc7e3 commit 926b1ea
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 20 deletions.
13 changes: 13 additions & 0 deletions config/sim_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
UGV:
- msg_type: "sensor_msgs/Image"
ros_topic: "/wrist_camera/color/image_raw"
kafka_topic: "heron.sim-test.wrist-camera"
# - msg_type: "sensor_msgs/JointState"
# ros_topic: "/joint_states"
# kafka_topic: "heron.sim-test.arm-joints"
# - msg_type: "nav_msgs/Odometry"
# ros_topic: "/odom"
# kafka_topic: "heron.sim-test.base-odom"
# - msg_type: "sensor_msgs/LaserScan"
# ros_topic: "/front/scan"
# kafka_topic: "heron.sim-test.lidar"
6 changes: 4 additions & 2 deletions launch/kafka_publisher.launch
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

<launch>

<arg name="bootstrap_server" default="localhost:9092"/>
<arg name="bootstrap_server" default="10.2.0.8:9092"/>
<arg name="sercurity_protocol" default="PLAINTEXT"/>
<arg name="topics_filename" default="topics.yaml"/>
<arg name="update_rate" default="10.0" />
<arg name="update_rate" default="0.1" />
<arg name="robot_name" default="UGV" />

<node pkg="ros_kafka_connector" type="kafka_publisher.py" name="kafka_publisher">
<param name="~bootstrap_server" value="$(arg bootstrap_server)"/>
<param name="~security_protocol" value="$(arg sercurity_protocol)"/>
<param name="~topics_filename" value="$(arg topics_filename)"/>
<param name="~update_rate" value="$(arg update_rate)"/>
<param name="~robot_name" value="$(arg robot_name)" />
Expand Down
4 changes: 3 additions & 1 deletion launch/ros_publisher.launch
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

<launch>

<arg name="bootstrap_server" default="localhost:9092"/>
<arg name="bootstrap_server" default="10.2.0.8:9092"/>
<arg name="sercurity_protocol" default="PLAINTEXT"/>
<arg name="topics_filename" default="topics.yaml"/>
<arg name="robot_name" default="UGV" />

<node pkg="ros_kafka_connector" type="ros_publisher.py" name="ros_publisher">
<param name="~bootstrap_server" value="$(arg bootstrap_server)"/>
<param name="~security_protocol" value="$(arg sercurity_protocol)"/>
<param name="~topics_filename" value="$(arg topics_filename)"/>
<param name="~robot_name" value="$(arg robot_name)" />
</node>
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
sudo apt install ros-noetic-rospy-message-converter
pip install kafka-python pyyaml
pip install kafka-python pyyaml
pip install confluent-kafka
76 changes: 64 additions & 12 deletions src/kafka_publisher.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/usr/bin/env python

import json
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka import KafkaProducer, KafkaConsumer
from confluent_kafka.admin import AdminClient, NewTopic
import rospy
import rospkg
from rospy_message_converter import json_message_converter
Expand Down Expand Up @@ -33,13 +33,28 @@ def __init__(self):
pkg.get_path("ros_kafka_connector") + "/config/" + self._filename
)

topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name)
topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name)

# initialise admin client to create topics
self.admin_client = AdminClient(
{"bootstrap.servers": self._bootstrap_server}
)

try:
self.admin_client.list_topics(timeout=5)
rospy.logwarn("Kafka connection successful.")
except Exception as err:
rospy.logerr(f"Failed to connect to Kafka: {err}")
rospy.signal_shutdown("Kafka connection failed.")

self.create_kafka_topics(topics_dict)

# start kafka producer
# self.producer = KafkaProducer(
# bootstrap_servers=self._bootstrap_server,
# value_serializer=lambda m: json.dumps(m).encode('ascii')
# )
self.producer = KafkaProducer(
bootstrap_servers=self._bootstrap_server,
security_protocol=self._security_protocol,
value_serializer=lambda m: json.dumps(m).encode("ascii"),
)

for msg_type, topics in topics_dict.items():
ros_topic = topics["ros_topic"]
Expand All @@ -50,28 +65,65 @@ def __init__(self):
msg_class,
lambda msg: self.callback(msg, kafka_topic),
)
rospy.loginfo(
rospy.logerr(
f"Using {msg_type} from ROS: {ros_topic} -> KAFKA: {kafka_topic}"
)

def load_parameters(self) -> None:
self._filename = rospy.get_param("~topics_filename", "topics.yaml")
self._bootstrap_server = rospy.get_param(
"~bootstrap_server", "localhost:9092"
"~bootstrap_server", "10.2.0.8:9092"
)
self._security_protocol = rospy.get_param(
"~security_protocol", "PLAINTEXT"
)
self._update_rate = float(rospy.get_param("~update_rate", "10.0"))
self._robot_name = rospy.get_param("~robot_name", "UGV")

def create_kafka_topics(self, topics_dict: dict) -> None:
"""
creates kafka topics based on config
:param topics_dict (dict): dictionary of kafka & ros topics
"""
kafka_topics = [topics["kafka_topic"] for topics in topics_dict.values()]

# check topic doesn't already exist
existing_topics = self.admin_client.list_topics().topics.keys()
new_topics = [
NewTopic(topic, num_partitions=1, replication_factor=1)
for topic in kafka_topics if topic not in existing_topics
]

if new_topics:
rospy.logerr(f"Creating kafka topic {[t.topic for t in new_topics]}")
futures = self.admin_client.create_topics(new_topics)

for topic, future in futures.items():
try:
future.result() # wait for op to finish
rospy.logerr(f"Kafka topic '{topic}' created sucessfully!")
except Exception as err:
rospy.logerr(f"Failed to create topic '{topic}' : {err}")

else:
rospy.logerr("All kafka topics already exist.")


def callback(self, msg, kafka_topic: str) -> None:
"""
takes msg from ros, converts to json, publishes to kafka
:param msg: ros msg from subscriber
:param msg: ros msg from subscriber
:param kafka_topic (str): kafka topic name
"""
rospy.logerr(f"Received message on ROS topic -> {kafka_topic}")

json_str = json_message_converter.convert_ros_message_to_json(msg)
# self.producer.send(kafka_topic, json_str)
rospy.logerr(f"Converted msgs: {json_str}...")

self.producer.send(kafka_topic, json_str)

def run(self):
rate = rospy.Rate(self._update_rate)
Expand Down
13 changes: 9 additions & 4 deletions src/ros_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self):
consumer = KafkaConsumer(
kafka_topic,
bootstrap_servers=self._bootstrap_server,
security_protocol=self._security_protocol,
value_deserializer=lambda m: json.loads(m.decode("ascii")),
auto_offset_reset="latest",
consumer_timeout_ms=5000,
Expand All @@ -48,19 +49,23 @@ def __init__(self):
def load_parameters(self) -> None:
self._filename = rospy.get_param("~topics_filename", "topics.yaml")
self._bootstrap_server = rospy.get_param(
"~bootstrap_server", "localhost:9092"
"~bootstrap_server", "10.2.0.8:9092"
)
self._security_protocol = rospy.get_param(
"~security_protocol", "PLAINTEXT"
)
self._robot_name = rospy.get_param("~robot_name", "UGV")


def run(self):

while not rospy.is_shutdown():
for consumer, publisher in zip(self.consumers, self.publishers):
for msg in consumer:
json_str = json.dump(msg.value)
ros_msg = json_message_converter.convert_json_to_ros_message(
publisher.type, json_str
ros_msg = (
json_message_converter.convert_json_to_ros_message(
publisher.type, json_str
)
)
publisher.publish(ros_msg)

Expand Down

0 comments on commit 926b1ea

Please sign in to comment.