Skip to content

Commit

Permalink
tested pub & sub to kafka w/ image msgs
Browse files Browse the repository at this point in the history
  • Loading branch information
luceharris committed Dec 4, 2024
1 parent 926b1ea commit 177ab1a
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 45 deletions.
74 changes: 66 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ROS Package: ros_kafka_connector
# ROS Package: `ros_kafka_connector`

This is a ROS package for subscribing or publishing to topics using Kafka.

Expand Down Expand Up @@ -29,23 +29,81 @@ Message types that are added are in the [`utils.py`](https://github.com/ethz-asl
| ------------- |:-------------:| -----:|
| bootstrap_server | IP of kafka server | "localhost:9092" |
| topics_filename | file name of yaml | "topics.yaml" |
| update_rate | update publisher rate | "10.0" |
| update_rate | update publisher rate (ms) | "10.0" |
| robot_name | first line of yaml | "UGV" |


#### installation & running
additional libraries required:
## Installation & running on Ubuntu 20.04
additional libraries can be found in the `requirements.txt` file.

### VPN setup

Download VPN Wireguard:
```bash
sudo apt install wireguard
```

Download the config files for the VpN (sent in email), and copy the folder to `/etc`
```bash
sudo cp /path/to/configs /etc/wireguard/
```

Set the permissions for the files:
```bash
sudo chmod 600 /etc/wireguard/<folder_name>/*
```

**To start the VPN:**
```bahs
sudo wg-quick up /etc/wireguard/heron/wg_heron_2.conf
```
Change the `wg_heron_2.conf` file to any *but* `wg_heron_8.conf`.

Confirm the VPN is up using:
```bash
ping 10.2.0.8
```

**To stop the VPN:**
```bash
sudo wg-quick down /etc/wireguard/heron/wg_heron_2.conf
```
pip install kafka-python pyyaml
sudo apt install ros-noetic-rospy-message-converter

(optional) I added these to my `~/.bashrc`:
```bash
# heron aliases
alias heron_vpn_up='sudo wg-quick up /etc/wireguard/heron/wg_heron_2.conf'
alias heron_vpn_down='sudo wg-quick down /etc/wireguard/heron/wg_heron_2.conf'
```

### Kafka client setup

1. Connect to the VPN
2. Start publishing data to ROS (ROSBag, simulation)

To start publishing kafka topics to ROS:
```console
```bash
roslaunch ros_kafka_connector ros_publisher.launch
```

To start publishing ROS topics to kafka:
```console
```bash
roslaunch ros_kafka_connector kafka_publisher.launch
```
Images will get compressed using opencv to Base64 format. Use the file `scripts/image_consumer.py` to view the images from the kafka messages. This script doesn't require ROS.

### Kafka GUI setup

Install the kakfa UI through docker.

```bash
docker run -d --name=kafka-ui -p 8080:8080 \
-e KAFKA_CLUSTERS_0_NAME=local \
-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=10.2.0.8:9092 \
-e KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=PLAINTEXT \
provectuslabs/kafka-ui:latest
```

Access the UI at http://localhost:8080


18 changes: 9 additions & 9 deletions config/sim_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ UGV:
- msg_type: "sensor_msgs/Image"
ros_topic: "/wrist_camera/color/image_raw"
kafka_topic: "heron.sim-test.wrist-camera"
# - msg_type: "sensor_msgs/JointState"
# ros_topic: "/joint_states"
# kafka_topic: "heron.sim-test.arm-joints"
# - msg_type: "nav_msgs/Odometry"
# ros_topic: "/odom"
# kafka_topic: "heron.sim-test.base-odom"
# - msg_type: "sensor_msgs/LaserScan"
# ros_topic: "/front/scan"
# kafka_topic: "heron.sim-test.lidar"
- msg_type: "nav_msgs/Odometry"
ros_topic: "/odom"
kafka_topic: "heron.sim-test.base-odom"
- msg_type: "sensor_msgs/LaserScan"
ros_topic: "/front/scan"
kafka_topic: "heron.sim-test.lidar"
- msg_type: "sensor_msgs/JointState"
ros_topic: "/joint_states"
kafka_topic: "heron.sim-test.arm-joints"
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
sudo apt install ros-noetic-rospy-message-converter
pip install kafka-python pyyaml
pip install confluent-kafka
pip install opencv-python
45 changes: 45 additions & 0 deletions scripts/image_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env python

import base64
import json
import argparse

import cv2
from kafka import KafkaConsumer
import numpy as np

def consume_images(kafka_topic: str, kafka_server: str):

consumer = KafkaConsumer(
kafka_topic,
bootstrap_servers=kafka_server,
value_deserializer=lambda m: json.loads(m.decode("ascii")),
)

print(f"subscribed to Kafka topic: {kafka_topic}")

for message in consumer:
print(f"type of message {type(message)}")
try:
image_data = message.value["image_data"]
image_bytes = base64.b64decode(image_data)
nparr = np.frombuffer(image_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

if img_rgb is not None:
cv2.imshow("Kafka Image", img_rgb)
cv2.waitKey(1)
except Exception as err:
print(f"Error decoding image: {err}")

if __name__ == "__main__":

parser = argparse.ArgumentParser()

parser.add_argument("-t", "--topic", default="heron.sim-test.wrist-camera", help="Kafka topic name")
parser.add_argument("-s", "--server", default="10.2.0.8:9092", help="Kafka bootstrap server")

args = parser.parse_args()

consume_images(args.topic, args.server)
74 changes: 56 additions & 18 deletions src/kafka_publisher.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#!/usr/bin/env python

import json
import base64
import cv2
from cv_bridge import CvBridge
from kafka import KafkaProducer, KafkaConsumer
from confluent_kafka.admin import AdminClient, NewTopic
import rospy
import rospkg
from rospy_message_converter import json_message_converter
from sensor_msgs.msg import Image
import utils


Expand All @@ -32,6 +36,7 @@ def __init__(self):
yaml_file = (
pkg.get_path("ros_kafka_connector") + "/config/" + self._filename
)
self.bridge = CvBridge()

topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name)

Expand All @@ -42,7 +47,7 @@ def __init__(self):

try:
self.admin_client.list_topics(timeout=5)
rospy.logwarn("Kafka connection successful.")
rospy.loginfo("Kafka connection successful.")
except Exception as err:
rospy.logerr(f"Failed to connect to Kafka: {err}")
rospy.signal_shutdown("Kafka connection failed.")
Expand All @@ -60,14 +65,24 @@ def __init__(self):
ros_topic = topics["ros_topic"]
kafka_topic = topics["kafka_topic"]
msg_class = utils.import_msg_type(msg_type)
rospy.Subscriber(
ros_topic,
msg_class,
lambda msg: self.callback(msg, kafka_topic),
)
rospy.logerr(
f"Using {msg_type} from ROS: {ros_topic} -> KAFKA: {kafka_topic}"
)
if msg_class == Image:
rospy.Subscriber(
ros_topic,
msg_class,
lambda msg, kafka_topic=kafka_topic: self.image_callback(msg, kafka_topic),
)
rospy.loginfo(
f"Subscribing to Image topic: {ros_topic} -> KAFKA: {kafka_topic}"
)
else:
rospy.Subscriber(
ros_topic,
msg_class,
lambda msg, kafka_topic=kafka_topic: self.callback(msg, kafka_topic),
)
rospy.loginfo(
f"Using {msg_type} from ROS: {ros_topic} -> KAFKA: {kafka_topic}"
)

def load_parameters(self) -> None:
self._filename = rospy.get_param("~topics_filename", "topics.yaml")
Expand All @@ -86,29 +101,53 @@ def create_kafka_topics(self, topics_dict: dict) -> None:
:param topics_dict (dict): dictionary of kafka & ros topics
"""
kafka_topics = [topics["kafka_topic"] for topics in topics_dict.values()]
kafka_topics = [
topics["kafka_topic"] for topics in topics_dict.values()
]

# check topic doesn't already exist
existing_topics = self.admin_client.list_topics().topics.keys()
new_topics = [
NewTopic(topic, num_partitions=1, replication_factor=1)
for topic in kafka_topics if topic not in existing_topics
for topic in kafka_topics
if topic not in existing_topics
]

if new_topics:
rospy.logerr(f"Creating kafka topic {[t.topic for t in new_topics]}")
rospy.loginfo(
f"Creating kafka topic {[t.topic for t in new_topics]}"
)
futures = self.admin_client.create_topics(new_topics)

for topic, future in futures.items():
try:
future.result() # wait for op to finish
rospy.logerr(f"Kafka topic '{topic}' created sucessfully!")
future.result() # wait for op to finish
rospy.loginfo(f"Kafka topic '{topic}' created sucessfully!")
except Exception as err:
rospy.logerr(f"Failed to create topic '{topic}' : {err}")

else:
rospy.logerr("All kafka topics already exist.")

def image_callback(self, msg, kafka_topic: str) -> None:
try:
# convert ros image to compressed jpeg and base64 encode
cv_image = self.bridge.imgmsg_to_cv2(
msg, desired_encoding="passthrough"
)
_, buffer = cv2.imencode(
".jpg", cv_image, [cv2.IMWRITE_JPEG_QUALITY, 50]
)
base64_image = base64.b64encode(buffer).decode("utf-8")

# Create a json message
json_message = {"topic": kafka_topic, "image_data": base64_image}
rospy.loginfo(f"Encoded image to Base64 for topic {kafka_topic}")
self.producer.send(kafka_topic, json_message)
except Exception as err:
rospy.logerr(
f"Failed to process image message for topic {kafka_topic}: {err}"
)

def callback(self, msg, kafka_topic: str) -> None:
"""
Expand All @@ -118,11 +157,10 @@ def callback(self, msg, kafka_topic: str) -> None:
:param kafka_topic (str): kafka topic name
"""
rospy.logerr(f"Received message on ROS topic -> {kafka_topic}")
rospy.loginfo(f"Received message on ROS topic -> {kafka_topic}")

json_str = json_message_converter.convert_ros_message_to_json(msg)
rospy.logerr(f"Converted msgs: {json_str}...")


self.producer.send(kafka_topic, json_str)

def run(self):
Expand Down
10 changes: 0 additions & 10 deletions src/topics.yaml

This file was deleted.

0 comments on commit 177ab1a

Please sign in to comment.