Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
luceharris committed Jan 16, 2025
1 parent 91efdd7 commit 1316751
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 29 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ topic_dict: {
```
Message types that are added are in the [`utils.py`](https://github.com/ethz-asl/ros-kafka-connector/blob/master/src/utils.py) file. To add new message types to the converter, add cases to `import_msg_type` function. If you have custom messages from ROS, you need to make them callable in your ros workspace.

This package does depend on both [`robotnik_msgs`](https://github.com/RobotnikAutomation/robotnik_msgs/tree/ros-devel) and [`heron_msgs`](https://github.com/RobotnikAutomation/heron_msgs). Make sure to add these packages to your catkin ws.

| Parameter | Info | Default |
| ------------- |:-------------:| -----:|
| bootstrap_server | IP of kafka server | "localhost:9092" |
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

# fetch values from package.xml.
setup_args = generate_distutils_setup(
packages=["ros_kafka_connector"],
packages=["ros_kafka_connector"],
package_dir={"": "src"}
)

Expand Down
10 changes: 2 additions & 8 deletions src/kafka_create_topics.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
#!/usr/bin/env python

import json
import base64
import cv2
from cv_bridge import CvBridge
from kafka import KafkaProducer, KafkaConsumer
from confluent_kafka.admin import AdminClient, NewTopic
import rospy
import rospkg
from rospy_message_converter import json_message_converter
from sensor_msgs.msg import Image
import utils


Expand All @@ -20,8 +14,7 @@ class KafkaCreateTopics:
- ros topic names
- kafka topic names
subscribes to the ros topic, and converts to json format
publishes json msg to the kafka topic
creates kafka topics
"""

Expand Down Expand Up @@ -97,6 +90,7 @@ def create_kafka_topics(self, topics_dict: dict) -> None:
else:
rospy.logerr("All kafka topics already exist.")


if __name__ == "__main__":

try:
Expand Down
14 changes: 12 additions & 2 deletions src/kafka_image_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,16 @@
)

class KafkaImagePublisher:
"""
sends ROS images to kafka with ROS srv call
images in kafka are in the Base64 type.
services will send an image before and after image processing
e.g. pothole detection
hardcoded ros topics & kafka topics
kafka topics should already be created
"""
def __init__(self):

rospy.init_node("kafka_image_publisher")
Expand All @@ -34,8 +43,9 @@ def __init__(self):
value_serializer=lambda m: json.dumps(m).encode("ascii"),
)

self.bridge = CvBridge()
self.bridge = CvBridge() # start bridge to convert images

# services for before/after image processing
self.body_img_srv = rospy.Service(
"/kafka/publish_body_image",
SendImageToKafka,
Expand Down Expand Up @@ -77,7 +87,7 @@ def send_arm_image_cb(

return self.send_to_kafka(self.arm_kafka_topic_, req.message, req.image)

def send_to_kafka(self, kafka_topic, msg, img) -> SendImageToKafkaResponse:
def send_to_kafka(self, kafka_topic: str, msg: str, img: Image) -> SendImageToKafkaResponse:
try:
cv_image = self.bridge.imgmsg_to_cv2(
img, desired_encoding="passthrough"
Expand Down
31 changes: 15 additions & 16 deletions src/kafka_publisher.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#!/usr/bin/env python

import json
import base64
import cv2
from cv_bridge import CvBridge
from kafka import KafkaProducer, KafkaConsumer
from confluent_kafka.admin import AdminClient, NewTopic
import rospy
import rospkg
import json

from typing import Dict, Any
from kafka import KafkaProducer
from confluent_kafka.admin import AdminClient, NewTopic
from rospy_message_converter import json_message_converter
from sensor_msgs.msg import Image

import utils


Expand All @@ -25,7 +25,7 @@ class KafkaPublisher:
"""

def __init__(self):
def __init__(self) -> None:

rospy.init_node("kafka_publisher")
rospy.on_shutdown(self.shutdown)
Expand All @@ -35,7 +35,6 @@ def __init__(self):
yaml_file = (
pkg.get_path("ros_kafka_connector") + "/config/" + self._filename
)
self.bridge = CvBridge()

self.topics_dict = utils.load_yaml_to_dict(yaml_file, self._robot_name)

Expand Down Expand Up @@ -65,7 +64,7 @@ def __init__(self):
details["ros_topic"]: None for details in self.topics_dict.values()
}

# Subscribers for all topics
# subscribers for all topics
for msg_type, details in self.topics_dict.items():
ros_topic = details["ros_topic"]
msg_class = utils.import_msg_type(msg_type)
Expand All @@ -90,7 +89,7 @@ def load_parameters(self) -> None:
self._update_rate = float(rospy.get_param("~update_rate", "10.0"))
self._robot_name = rospy.get_param("~robot_name", "UGV")

def create_kafka_topics(self, topics_dict: dict) -> None:
def create_kafka_topics(self, topics_dict: Dict[str, Dict[str, str]]) -> None:
"""
creates kafka topics based on config
Expand Down Expand Up @@ -124,13 +123,13 @@ def create_kafka_topics(self, topics_dict: dict) -> None:
else:
rospy.logerr("All kafka topics already exist.")

def message_callback(self, msg, ros_topic):
def message_callback(self, msg: Any, ros_topic: str) -> None:
"""
stores latest ros msg
"""
self.latest_msgs[ros_topic] = msg

def publish_to_kafka(self):
def publish_to_kafka(self) -> None:
"""
publish the latest messages to their respective Kafka topics.
"""
Expand All @@ -140,10 +139,10 @@ def publish_to_kafka(self):
msg = self.latest_msgs[ros_topic]

if msg is None:
continue # Skip if no message has been received yet
continue # skip if no message has been received yet

try:
# Convert other messages to JSON
# convert messages to JSON
json_message = (
json_message_converter.convert_ros_message_to_json(msg)
)
Expand All @@ -155,13 +154,13 @@ def publish_to_kafka(self):
f"Failed to publish message from {ros_topic} to {kafka_topic}: {e}"
)

def run(self):
def run(self) -> None:
rate = rospy.Rate(self._update_rate)
while not rospy.is_shutdown():
self.publish_to_kafka()
rate.sleep()

def shutdown(self):
def shutdown(self) -> None:
rospy.loginfo("Shutting down")


Expand Down
4 changes: 2 additions & 2 deletions src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def load_yaml_to_dict(yaml_file: str, robot_name: str = "UGV") -> dict:
takes yaml file and converts to dict which includes topic names & msg types
:param yaml_file (str): path to yaml file
:param robot_name(str, optional): first line of yaml file, default -> 'UGV'
:param robot_name(str, optional): first line of yaml file, default -> 'UGV'
:return topic_dict (dict): dictionary mapping topics & msg types
example: topic_list.yaml
Expand Down Expand Up @@ -51,7 +51,7 @@ def load_yaml_to_dict(yaml_file: str, robot_name: str = "UGV") -> dict:
def import_msg_type(msg_type: str):
"""
takes a ros msg_type and dynamically imports the msg type and returns it
:params msg_type (str): the string identifier for the ROS msg type
:return subscriber_type (class): the corresponding ROS msg class
:raises ValueError: if msg_type is not found
Expand Down

0 comments on commit 1316751

Please sign in to comment.