Skip to content

Commit

Permalink
Merge pull request #1 from ethz-asl/feature/image_srv
Browse files Browse the repository at this point in the history
Ready for initial testing to publish to IMS for HERON pilot
  • Loading branch information
luceharris authored Jan 16, 2025
2 parents 177ab1a + 1316751 commit 164d8f1
Show file tree
Hide file tree
Showing 13 changed files with 370 additions and 85 deletions.
11 changes: 10 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,13 @@ project(ros_kafka_connector)

find_package(catkin REQUIRED)
catkin_python_setup()
catkin_package()
catkin_package(
CATKIN_DEPENDS
rospy
std_msgs
geometry_msgs
sensor_msgs
tf2_msgs
heron_msgs
robotnik_msgs
)
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ topic_dict: {
```
Message types that are added are in the [`utils.py`](https://github.com/ethz-asl/ros-kafka-connector/blob/master/src/utils.py) file. To add new message types to the converter, add cases to `import_msg_type` function. If you have custom messages from ROS, you need to make them callable in your ros workspace.

This package does depend on both [`robotnik_msgs`](https://github.com/RobotnikAutomation/robotnik_msgs/tree/ros-devel) and [`heron_msgs`](https://github.com/RobotnikAutomation/heron_msgs). Make sure to add these packages to your catkin ws.

| Parameter | Info | Default |
| ------------- |:-------------:| -----:|
| bootstrap_server | IP of kafka server | "localhost:9092" |
Expand Down
19 changes: 19 additions & 0 deletions config/heron_pilot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
UGV:
- msg_type: "sensor_msgs/Image"
ros_topic: "/robot/arm_camera/front_rgbd_camera/rgb/image_raw"
kafka_topic: "ugv.arm-camera.rgb.image-raw"
- msg_type: "sensor_msgs/Image"
ros_topic: "/robot/body_camera/front_rgbd_camera/rgb/image_raw"
kafka_topic: "ugv.body-camera.rgb.image-raw"
- msg_type: "sensor_msgs/NavSatFix"
ros_topic: "/robot/gps/fix"
kafka_topic: "ugv.gps-fix"
- msg_type: "robotnik_msgs/BatteryStatus"
ros_topic: "/robot/battery_estimator/data"
kafka_topic: "ugv.battery-status"
- msg_type: "robotnik_msgs/State"
ros_topic: "/robot/robot_local_control/RobotStatusComponent/state"
kafka_topic: "ugv.status"
- msg_type: "std_msgs/String"
ros_topic: "/hlp/state"
kafka_topic: "ugv.hlp.status"
7 changes: 7 additions & 0 deletions config/pothole_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
UGV:
- msg_type: "sensor_msgs/Image"
ros_topic: "/wrist_rgbd_camera/rgb/image_raw"
kafka_topic: "ugv.image.arm"
- msg_type: "sensor_msgs/Image"
ros_topic: "/front_rgbd_camera/rgb/image_raw"
kafka_topic: "ugv.image.body"
21 changes: 12 additions & 9 deletions config/topics.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
UGV:
- msg_type: "sensor_msgs/Image"
ros_topic: "/wrist_camera/color/image_raw"
kafka_topic: "wrist_camera"
- msg_type: "tf2_msgs/TFMessage"
ros_topic: "/tf"
kafka_topic: "frames"
- msg_type: "sensor_msgs/JointState"
ros_topic: "/joint_state"
kafka_topic: "arm_joints"
- msg_type: "robotnik_msgs/BatteryStatus"
ros_topic: "/robot/battery_estimator/data"
kafka_topic: "ugv.battery.status"
- msg_type: "sensor_msgs/NavSatFix"
ros_topic: "/robot/gps/fix"
kafka_topic: "ugv.gps.fix"
- msg_type: "robotnik_msgs/State"
ros_topic: "/robot/robot_local_control/RobotStatusComponent/state"
kafka_topic: "ugv.state"
- msg_type: "std_msgs/String"
ros_topic: "/hlp/state"
kafka_topic: "hlp.state"
19 changes: 19 additions & 0 deletions launch/kafka_create_topics.launch
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0"?>

<launch>

<arg name="bootstrap_server" default="10.2.0.8:9092"/>
<arg name="sercurity_protocol" default="PLAINTEXT"/>
<arg name="topics_filename" default="topics.yaml"/>
<arg name="update_rate" default="0.1" />
<arg name="robot_name" default="UGV" />

<node pkg="ros_kafka_connector" type="kafka_create_topics.py" name="kafka_create_topics">
<param name="~bootstrap_server" value="$(arg bootstrap_server)"/>
<param name="~security_protocol" value="$(arg sercurity_protocol)"/>
<param name="~topics_filename" value="$(arg topics_filename)"/>
<param name="~update_rate" value="$(arg update_rate)"/>
<param name="~robot_name" value="$(arg robot_name)" />
</node>

</launch>
20 changes: 13 additions & 7 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@
<license>MIT</license>

<buildtool_depend>catkin</buildtool_depend>
<build_depend>roscpp</build_depend>
<build_depend>rospy</build_depend>
<build_depend>std_msgs</build_depend>
<build_depend>geometry_msgs</build_depend>
<build_depend>sensor_msgs</build_depend>
<build_depend>nav_msgs</build_depend>
<build_depend>tf2_msgs</build_depend>

<depend>rospy</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>sensor_msgs</depend>
<depend>nav_msgs</depend>
<depend>tf2_msgs</depend>

<!-- custom msgs -->
<build_depend>heron_msgs</build_depend>
<build_depend>robotnik_msgs</build_depend>
<exec_depend>heron_msgs</exec_depend>
<exec_depend>robotnik_msgs</exec_depend>
<exec_depend>rospy_message_converter</exec_depend>

<!-- The export tag contains other, unspecified, tags -->
Expand Down
2 changes: 1 addition & 1 deletion scripts/image_consumer.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def consume_images(kafka_topic: str, kafka_server: str):

parser = argparse.ArgumentParser()

parser.add_argument("-t", "--topic", default="heron.sim-test.wrist-camera", help="Kafka topic name")
parser.add_argument("-t", "--topic", default="test.image.stream", help="Kafka topic name")
parser.add_argument("-s", "--server", default="10.2.0.8:9092", help="Kafka bootstrap server")

args = parser.parse_args()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

# fetch values from package.xml.
setup_args = generate_distutils_setup(
packages=["ros_kafka_connector"],
packages=["ros_kafka_connector"],
package_dir={"": "src"}
)

Expand Down
101 changes: 101 additions & 0 deletions src/kafka_create_topics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env python

from cv_bridge import CvBridge
from confluent_kafka.admin import AdminClient, NewTopic
import rospy
import rospkg
import utils


class KafkaCreateTopics:
"""
takes a yaml file with:
- ros msg types
- ros topic names
- kafka topic names
creates kafka topics
"""

def __init__(self):

rospy.init_node("kafka_topic_creator")

self.load_parameters()
pkg = rospkg.RosPack()
yaml_file = (
pkg.get_path("ros_kafka_connector") + "/config/" + self._filename
)
self.bridge = CvBridge()

topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name)

# initialise admin client to create topics
self.admin_client = AdminClient(
{"bootstrap.servers": self._bootstrap_server}
)

try:
self.admin_client.list_topics(timeout=5)
rospy.loginfo("Kafka connection successful.")
except Exception as err:
rospy.logerr(f"Failed to connect to Kafka: {err}")
rospy.signal_shutdown("Kafka connection failed.")

self.create_kafka_topics(topics_dict)

def load_parameters(self) -> None:
self._filename = rospy.get_param("~topics_filename", "topics.yaml")
self._bootstrap_server = rospy.get_param(
"~bootstrap_server", "10.2.0.8:9092"
)
self._security_protocol = rospy.get_param(
"~security_protocol", "PLAINTEXT"
)
self._update_rate = float(rospy.get_param("~update_rate", "10.0"))
self._robot_name = rospy.get_param("~robot_name", "UGV")

def create_kafka_topics(self, topics_dict: dict) -> None:
"""
creates kafka topics based on config
:param topics_dict (dict): dictionary of kafka & ros topics
"""
kafka_topics = [
topics["kafka_topic"] for topics in topics_dict.values()
]

# check topic doesn't already exist
existing_topics = self.admin_client.list_topics().topics.keys()
new_topics = [
NewTopic(topic, num_partitions=1, replication_factor=1)
for topic in kafka_topics
if topic not in existing_topics
]

if new_topics:
rospy.loginfo(
f"Creating kafka topic {[t.topic for t in new_topics]}"
)
futures = self.admin_client.create_topics(new_topics)

for topic, future in futures.items():
try:
future.result() # wait for op to finish
rospy.loginfo(f"Kafka topic '{topic}' created sucessfully!")
except Exception as err:
rospy.logerr(f"Failed to create topic '{topic}' : {err}")

else:
rospy.logerr("All kafka topics already exist.")


if __name__ == "__main__":

try:
node = KafkaCreateTopics()
except rospy.ROSInterruptException:
pass

rospy.loginfo("Exiting")
119 changes: 119 additions & 0 deletions src/kafka_image_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#!/usr/bin/env python

import json
import base64
import cv2
from cv_bridge import CvBridge
from kafka import KafkaProducer, KafkaConsumer
from confluent_kafka.admin import AdminClient, NewTopic
import rospy
import rospkg
from rospy_message_converter import json_message_converter
from sensor_msgs.msg import Image
from std_srvs.srv import Empty

from heron_msgs.srv import (
SendImageToKafka,
SendImageToKafkaRequest,
SendImageToKafkaResponse,
)

class KafkaImagePublisher:
"""
sends ROS images to kafka with ROS srv call
images in kafka are in the Base64 type.
services will send an image before and after image processing
e.g. pothole detection
hardcoded ros topics & kafka topics
kafka topics should already be created
"""
def __init__(self):

rospy.init_node("kafka_image_publisher")

self.load_parameters()
self.bridge = CvBridge()

# start kafka producer
self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap_server_,
security_protocol=self.security_protocol_,
value_serializer=lambda m: json.dumps(m).encode("ascii"),
)

self.bridge = CvBridge() # start bridge to convert images

# services for before/after image processing
self.body_img_srv = rospy.Service(
"/kafka/publish_body_image",
SendImageToKafka,
self.send_body_image_cb,
)
self.arm_img_srv = rospy.Service(
"/kafka/publish_arm_image", SendImageToKafka, self.send_arm_image_cb
)

rospy.spin()

def load_parameters(self) -> None:
self.filename_ = rospy.get_param("~topics_filename", "topics.yaml")
self.bootstrap_server_ = rospy.get_param(
"~bootstrap_server", "10.2.0.8:9092"
)
self.security_protocol_ = rospy.get_param(
"~security_protocol", "PLAINTEXT"
)
self.update_rate_ = float(rospy.get_param("~update_rate", "10.0"))
self.rate_ = rospy.Rate(self.update_rate_)
self.robot_name_ = rospy.get_param("~robot_name", "UGV")
self.body_ros_topic_ = "/front_rgbd_camera/rgb/image_raw"
self.arm_ros_topic_ = "/wrist_rgbd_camera/rgb/image_raw"
self.body_kafka_topic_ = "ugv.image.body"
self.arm_kafka_topic_ = "ugv.image.arm"

def send_body_image_cb(
self, req: SendImageToKafkaRequest
) -> SendImageToKafkaResponse:

return self.send_to_kafka(
self.body_kafka_topic_, req.message, req.image
)

def send_arm_image_cb(
self, req: SendImageToKafkaRequest
) -> SendImageToKafkaResponse:

return self.send_to_kafka(self.arm_kafka_topic_, req.message, req.image)

def send_to_kafka(self, kafka_topic: str, msg: str, img: Image) -> SendImageToKafkaResponse:
try:
cv_image = self.bridge.imgmsg_to_cv2(
img, desired_encoding="passthrough"
)
_, buffer = cv2.imencode(
".jpg", cv_image, [cv2.IMWRITE_JPEG_QUALITY, 50]
)
base64_image = base64.b64encode(buffer).decode("utf-8")

# Create a json message
json_message = {"message": msg, "image_data": base64_image}
rospy.loginfo(f"Encoded image to Base64 for topic {kafka_topic}")
self.producer.send(kafka_topic, json_message)
return SendImageToKafkaResponse(success=True)
except Exception as err:
rospy.logerr(
f"Failed to process image message for topic {kafka_topic}: {err}"
)
return SendImageToKafkaResponse(success=False)


if __name__ == "__main__":

try:
node = KafkaImagePublisher()
except rospy.ROSInterruptException:
pass

rospy.loginfo("Exiting")
Loading

0 comments on commit 164d8f1

Please sign in to comment.