Skip to content

Commit

Permalink
image pub now srv, adding defined kafka defs for IMS
Browse files Browse the repository at this point in the history
  • Loading branch information
luceharris committed Jan 15, 2025
1 parent 924ee7b commit 91efdd7
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 71 deletions.
11 changes: 10 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,13 @@ project(ros_kafka_connector)

find_package(catkin REQUIRED)
catkin_python_setup()
catkin_package()
catkin_package(
CATKIN_DEPENDS
rospy
std_msgs
geometry_msgs
sensor_msgs
tf2_msgs
heron_msgs
robotnik_msgs
)
19 changes: 19 additions & 0 deletions config/heron_pilot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
UGV:
- msg_type: "sensor_msgs/Image"
ros_topic: "/robot/arm_camera/front_rgbd_camera/rgb/image_raw"
kafka_topic: "ugv.arm-camera.rgb.image-raw"
- msg_type: "sensor_msgs/Image"
ros_topic: "/robot/body_camera/front_rgbd_camera/rgb/image_raw"
kafka_topic: "ugv.body-camera.rgb.image-raw"
- msg_type: "sensor_msgs/NavSatFix"
ros_topic: "/robot/gps/fix"
kafka_topic: "ugv.gps-fix"
- msg_type: "robotnik_msgs/BatteryStatus"
ros_topic: "/robot/battery_estimator/data"
kafka_topic: "ugv.battery-status"
- msg_type: "robotnik_msgs/State"
ros_topic: "/robot/robot_local_control/RobotStatusComponent/state"
kafka_topic: "ugv.status"
- msg_type: "std_msgs/String"
ros_topic: "/hlp/state"
kafka_topic: "ugv.hlp.status"
7 changes: 7 additions & 0 deletions config/pothole_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
UGV:
- msg_type: "sensor_msgs/Image"
ros_topic: "/wrist_rgbd_camera/rgb/image_raw"
kafka_topic: "ugv.image.arm"
- msg_type: "sensor_msgs/Image"
ros_topic: "/front_rgbd_camera/rgb/image_raw"
kafka_topic: "ugv.image.body"
21 changes: 12 additions & 9 deletions config/topics.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
UGV:
- msg_type: "sensor_msgs/Image"
ros_topic: "/wrist_camera/color/image_raw"
kafka_topic: "wrist_camera"
- msg_type: "tf2_msgs/TFMessage"
ros_topic: "/tf"
kafka_topic: "frames"
- msg_type: "sensor_msgs/JointState"
ros_topic: "/joint_state"
kafka_topic: "arm_joints"
- msg_type: "robotnik_msgs/BatteryStatus"
ros_topic: "/robot/battery_estimator/data"
kafka_topic: "ugv.battery.status"
- msg_type: "sensor_msgs/NavSatFix"
ros_topic: "/robot/gps/fix"
kafka_topic: "ugv.gps.fix"
- msg_type: "robotnik_msgs/State"
ros_topic: "/robot/robot_local_control/RobotStatusComponent/state"
kafka_topic: "ugv.state"
- msg_type: "std_msgs/String"
ros_topic: "/hlp/state"
kafka_topic: "hlp.state"
19 changes: 19 additions & 0 deletions launch/kafka_create_topics.launch
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0"?>

<launch>

<arg name="bootstrap_server" default="10.2.0.8:9092"/>
<arg name="sercurity_protocol" default="PLAINTEXT"/>
<arg name="topics_filename" default="topics.yaml"/>
<arg name="update_rate" default="0.1" />
<arg name="robot_name" default="UGV" />

<node pkg="ros_kafka_connector" type="kafka_create_topics.py" name="kafka_create_topics">
<param name="~bootstrap_server" value="$(arg bootstrap_server)"/>
<param name="~security_protocol" value="$(arg sercurity_protocol)"/>
<param name="~topics_filename" value="$(arg topics_filename)"/>
<param name="~update_rate" value="$(arg update_rate)"/>
<param name="~robot_name" value="$(arg robot_name)" />
</node>

</launch>
20 changes: 13 additions & 7 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@
<license>MIT</license>

<buildtool_depend>catkin</buildtool_depend>
<build_depend>roscpp</build_depend>
<build_depend>rospy</build_depend>
<build_depend>std_msgs</build_depend>
<build_depend>geometry_msgs</build_depend>
<build_depend>sensor_msgs</build_depend>
<build_depend>nav_msgs</build_depend>
<build_depend>tf2_msgs</build_depend>

<depend>rospy</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>sensor_msgs</depend>
<depend>nav_msgs</depend>
<depend>tf2_msgs</depend>

<!-- custom msgs -->
<build_depend>heron_msgs</build_depend>
<build_depend>robotnik_msgs</build_depend>
<exec_depend>heron_msgs</exec_depend>
<exec_depend>robotnik_msgs</exec_depend>
<exec_depend>rospy_message_converter</exec_depend>

<!-- The export tag contains other, unspecified, tags -->
Expand Down
2 changes: 1 addition & 1 deletion scripts/image_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def consume_images(kafka_topic: str, kafka_server: str):

parser = argparse.ArgumentParser()

parser.add_argument("-t", "--topic", default="ugv.image.body", help="Kafka topic name")
parser.add_argument("-t", "--topic", default="test.image.stream", help="Kafka topic name")
parser.add_argument("-s", "--server", default="10.2.0.8:9092", help="Kafka bootstrap server")

args = parser.parse_args()
Expand Down
107 changes: 107 additions & 0 deletions src/kafka_create_topics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env python

import json
import base64
import cv2
from cv_bridge import CvBridge
from kafka import KafkaProducer, KafkaConsumer
from confluent_kafka.admin import AdminClient, NewTopic
import rospy
import rospkg
from rospy_message_converter import json_message_converter
from sensor_msgs.msg import Image
import utils


class KafkaCreateTopics:
"""
takes a yaml file with:
- ros msg types
- ros topic names
- kafka topic names
subscribes to the ros topic, and converts to json format
publishes json msg to the kafka topic
"""

def __init__(self):

rospy.init_node("kafka_topic_creator")

self.load_parameters()
pkg = rospkg.RosPack()
yaml_file = (
pkg.get_path("ros_kafka_connector") + "/config/" + self._filename
)
self.bridge = CvBridge()

topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name)

# initialise admin client to create topics
self.admin_client = AdminClient(
{"bootstrap.servers": self._bootstrap_server}
)

try:
self.admin_client.list_topics(timeout=5)
rospy.loginfo("Kafka connection successful.")
except Exception as err:
rospy.logerr(f"Failed to connect to Kafka: {err}")
rospy.signal_shutdown("Kafka connection failed.")

self.create_kafka_topics(topics_dict)

def load_parameters(self) -> None:
self._filename = rospy.get_param("~topics_filename", "topics.yaml")
self._bootstrap_server = rospy.get_param(
"~bootstrap_server", "10.2.0.8:9092"
)
self._security_protocol = rospy.get_param(
"~security_protocol", "PLAINTEXT"
)
self._update_rate = float(rospy.get_param("~update_rate", "10.0"))
self._robot_name = rospy.get_param("~robot_name", "UGV")

def create_kafka_topics(self, topics_dict: dict) -> None:
"""
creates kafka topics based on config
:param topics_dict (dict): dictionary of kafka & ros topics
"""
kafka_topics = [
topics["kafka_topic"] for topics in topics_dict.values()
]

# check topic doesn't already exist
existing_topics = self.admin_client.list_topics().topics.keys()
new_topics = [
NewTopic(topic, num_partitions=1, replication_factor=1)
for topic in kafka_topics
if topic not in existing_topics
]

if new_topics:
rospy.loginfo(
f"Creating kafka topic {[t.topic for t in new_topics]}"
)
futures = self.admin_client.create_topics(new_topics)

for topic, future in futures.items():
try:
future.result() # wait for op to finish
rospy.loginfo(f"Kafka topic '{topic}' created sucessfully!")
except Exception as err:
rospy.logerr(f"Failed to create topic '{topic}' : {err}")

else:
rospy.logerr("All kafka topics already exist.")

if __name__ == "__main__":

try:
node = KafkaCreateTopics()
except rospy.ROSInterruptException:
pass

rospy.loginfo("Exiting")
109 changes: 109 additions & 0 deletions src/kafka_image_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python

import json
import base64
import cv2
from cv_bridge import CvBridge
from kafka import KafkaProducer, KafkaConsumer
from confluent_kafka.admin import AdminClient, NewTopic
import rospy
import rospkg
from rospy_message_converter import json_message_converter
from sensor_msgs.msg import Image
from std_srvs.srv import Empty

from heron_msgs.srv import (
SendImageToKafka,
SendImageToKafkaRequest,
SendImageToKafkaResponse,
)

class KafkaImagePublisher:

def __init__(self):

rospy.init_node("kafka_image_publisher")

self.load_parameters()
self.bridge = CvBridge()

# start kafka producer
self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap_server_,
security_protocol=self.security_protocol_,
value_serializer=lambda m: json.dumps(m).encode("ascii"),
)

self.bridge = CvBridge()

self.body_img_srv = rospy.Service(
"/kafka/publish_body_image",
SendImageToKafka,
self.send_body_image_cb,
)
self.arm_img_srv = rospy.Service(
"/kafka/publish_arm_image", SendImageToKafka, self.send_arm_image_cb
)

rospy.spin()

def load_parameters(self) -> None:
self.filename_ = rospy.get_param("~topics_filename", "topics.yaml")
self.bootstrap_server_ = rospy.get_param(
"~bootstrap_server", "10.2.0.8:9092"
)
self.security_protocol_ = rospy.get_param(
"~security_protocol", "PLAINTEXT"
)
self.update_rate_ = float(rospy.get_param("~update_rate", "10.0"))
self.rate_ = rospy.Rate(self.update_rate_)
self.robot_name_ = rospy.get_param("~robot_name", "UGV")
self.body_ros_topic_ = "/front_rgbd_camera/rgb/image_raw"
self.arm_ros_topic_ = "/wrist_rgbd_camera/rgb/image_raw"
self.body_kafka_topic_ = "ugv.image.body"
self.arm_kafka_topic_ = "ugv.image.arm"

def send_body_image_cb(
self, req: SendImageToKafkaRequest
) -> SendImageToKafkaResponse:

return self.send_to_kafka(
self.body_kafka_topic_, req.message, req.image
)

def send_arm_image_cb(
self, req: SendImageToKafkaRequest
) -> SendImageToKafkaResponse:

return self.send_to_kafka(self.arm_kafka_topic_, req.message, req.image)

def send_to_kafka(self, kafka_topic, msg, img) -> SendImageToKafkaResponse:
try:
cv_image = self.bridge.imgmsg_to_cv2(
img, desired_encoding="passthrough"
)
_, buffer = cv2.imencode(
".jpg", cv_image, [cv2.IMWRITE_JPEG_QUALITY, 50]
)
base64_image = base64.b64encode(buffer).decode("utf-8")

# Create a json message
json_message = {"message": msg, "image_data": base64_image}
rospy.loginfo(f"Encoded image to Base64 for topic {kafka_topic}")
self.producer.send(kafka_topic, json_message)
return SendImageToKafkaResponse(success=True)
except Exception as err:
rospy.logerr(
f"Failed to process image message for topic {kafka_topic}: {err}"
)
return SendImageToKafkaResponse(success=False)


if __name__ == "__main__":

try:
node = KafkaImagePublisher()
except rospy.ROSInterruptException:
pass

rospy.loginfo("Exiting")
Loading

0 comments on commit 91efdd7

Please sign in to comment.