Skip to content

Commit

Permalink
Merge v0.5.0 (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
peifeng-unity authored Jul 16, 2021
1 parent 18612dc commit f978d94
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 112 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) a

### Fixed

## [0.5.0] - 2021-07-15

### Upgrade Notes

Upgrade the ROS communication to support ROS2 with Unity

### Known Issues

### Added

### Changed

### Deprecated

### Removed

### Fixed

## [0.4.0] - 2021-05-27

Note: the logs only reflects the changes from version 0.3.0
Expand Down
15 changes: 1 addition & 14 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,8 @@ project(ros_tcp_endpoint)
find_package(catkin REQUIRED COMPONENTS
rospy
std_msgs
message_generation
)

catkin_python_setup()

add_message_files(FILES
RosUnityError.msg
RosUnitySrvMessage.msg
RosUnitySysCommand.msg
)

add_service_files(FILES
RosUnityTopicList.srv
)

generate_messages(DEPENDENCIES std_msgs)

catkin_package(CATKIN_DEPENDS message_runtime)
catkin_package(CATKIN_DEPENDS message_runtime )
1 change: 0 additions & 1 deletion msg/RosUnityError.msg

This file was deleted.

4 changes: 0 additions & 4 deletions msg/RosUnitySrvMessage.msg

This file was deleted.

2 changes: 0 additions & 2 deletions msg/RosUnitySysCommand.msg

This file was deleted.

2 changes: 1 addition & 1 deletion package.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<package format="2">
<name>ros_tcp_endpoint</name>
<version>0.4.0</version>
<version>0.5.0</version>
<description>Acts as the bridge between Unity messages sent via Websocket and ROS messages.</description>

<maintainer email="[email protected]">Unity Robotics</maintainer>
Expand Down
98 changes: 56 additions & 42 deletions src/ros_tcp_endpoint/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from io import BytesIO

import threading
import json

from .exceptions import TopicOrServiceNameDoesNotExistError
from ros_tcp_endpoint.msg import RosUnitySrvMessage


class ClientThread(threading.Thread):
Expand Down Expand Up @@ -145,6 +145,47 @@ def serialize_message(destination, message):

return serialized_message

@staticmethod
def serialize_command(command, params):
cmd_bytes = command.encode("utf-8")
cmd_length = len(cmd_bytes)
cmd_info = struct.pack("<I%ss" % cmd_length, cmd_length, cmd_bytes)

json_bytes = json.dumps(params.__dict__).encode("utf-8")
json_length = len(json_bytes)
json_info = struct.pack("<I%ss" % json_length, json_length, json_bytes)

return cmd_info + json_info

def send_ros_service_request(self, srv_id, destination, data):
if destination not in self.tcp_server.source_destination_dict.keys():
error_msg = "Service destination '{}' is not registered! Known topics are: {} ".format(
destination, self.tcp_server.source_destination_dict.keys()
)
self.tcp_server.send_unity_error(error_msg)
rospy.logerr(error_msg)
# TODO: send a response to Unity anyway?
return
else:
ros_communicator = self.tcp_server.source_destination_dict[destination]
service_thread = threading.Thread(
target=self.service_call_thread, args=(srv_id, destination, data, ros_communicator)
)
service_thread.daemon = True
service_thread.start()

def service_call_thread(self, srv_id, destination, data, ros_communicator):
response = ros_communicator.send(data)

if not response:
error_msg = "No response data from service '{}'!".format(destination)
self.tcp_server.send_unity_error(error_msg)
rospy.logerr(error_msg)
# TODO: send a response to Unity anyway?
return

self.tcp_server.unity_tcp_sender.send_ros_service_response(srv_id, destination, response)

def run(self):
"""
Read a message and determine where to send it based on the source_destination_dict
Expand All @@ -167,50 +208,23 @@ def run(self):
while not halt_event.is_set():
destination, data = self.read_message(self.conn)

if destination == "":
if self.tcp_server.pending_srv_id is not None:
# if we've been told that the next message will be a service request/response, process it as such
if self.tcp_server.pending_srv_is_request:
self.send_ros_service_request(
self.tcp_server.pending_srv_id, destination, data
)
else:
self.tcp_server.send_unity_service_response(
self.tcp_server.pending_srv_id, data
)
self.tcp_server.pending_srv_id = None
elif destination == "":
# ignore this keepalive message, listen for more
pass
elif destination == "__syscommand":
elif destination.startswith("__"):
# handle a system command, such as registering new topics
self.tcp_server.handle_syscommand(data)
elif destination == "__srv":
# handle a ros service message request, or a unity service message response
srv_message = RosUnitySrvMessage().deserialize(data)
if not srv_message.is_request:
self.tcp_server.send_unity_service_response(
srv_message.srv_id, srv_message.payload
)
continue
elif srv_message.topic == "__topic_list":
response = self.tcp_server.topic_list(data)
elif srv_message.topic not in self.tcp_server.source_destination_dict.keys():
error_msg = "Service destination '{}' is not registered! Known topics are: {} ".format(
srv_message.topic, self.tcp_server.source_destination_dict.keys()
)
self.tcp_server.send_unity_error(error_msg)
rospy.logerr(error_msg)
# TODO: send a response to Unity anyway?
continue
else:
ros_communicator = self.tcp_server.source_destination_dict[
srv_message.topic
]
response = ros_communicator.send(srv_message.payload)
if not response:
error_msg = "No response data from service '{}'!".format(
srv_message.topic
)
self.tcp_server.send_unity_error(error_msg)
rospy.logerr(error_msg)
# TODO: send a response to Unity anyway?
continue

serial_response = BytesIO()
response.serialize(serial_response)
response_message = RosUnitySrvMessage(
srv_message.srv_id, False, "", serial_response.getvalue()
)
self.tcp_server.unity_tcp_sender.send_unity_message("__srv", response_message)
self.tcp_server.handle_syscommand(destination, data)
elif destination in self.tcp_server.source_destination_dict:
ros_communicator = self.tcp_server.source_destination_dict[destination]
response = ros_communicator.send(data)
Expand Down
Empty file modified src/ros_tcp_endpoint/default_server_endpoint.py
100644 → 100755
Empty file.
34 changes: 20 additions & 14 deletions src/ros_tcp_endpoint/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
from .publisher import RosPublisher
from .service import RosService
from .unity_service import UnityService
from ros_tcp_endpoint.msg import RosUnitySysCommand
from ros_tcp_endpoint.srv import RosUnityTopicListResponse


class TcpServer:
Expand Down Expand Up @@ -61,6 +59,8 @@ def __init__(self, node_name, buffer_size=1024, connections=2, tcp_ip="", tcp_po
self.buffer_size = buffer_size
self.connections = connections
self.syscommands = SysCommands(self)
self.pending_srv_id = None
self.pending_srv_is_request = False

def start(self, source_destination_dict=None):
if source_destination_dict is not None:
Expand Down Expand Up @@ -96,24 +96,19 @@ def send_unity_message(self, topic, message):
self.unity_tcp_sender.send_unity_message(topic, message)

def send_unity_service(self, topic, service_class, request):
return self.unity_tcp_sender.send_unity_service(topic, service_class, request)
return self.unity_tcp_sender.send_unity_service_request(topic, service_class, request)

def send_unity_service_response(self, srv_id, data):
self.unity_tcp_sender.send_unity_service_response(srv_id, data)

def topic_list(self, data):
return RosUnityTopicListResponse(self.source_destination_dict.keys())

def handle_syscommand(self, data):
message = RosUnitySysCommand().deserialize(data)
function = getattr(self.syscommands, message.command)
def handle_syscommand(self, topic, data):
function = getattr(self.syscommands, topic[2:])
if function is None:
self.send_unity_error(
"Don't understand SysCommand.'{}'({})".format(message.command, message.params_json)
)
self.send_unity_error("Don't understand SysCommand.'{}'".format(topic))
return
else:
params = json.loads(message.params_json)
message_json = data.decode("utf-8")
params = json.loads(message_json)
function(**params)


Expand Down Expand Up @@ -220,9 +215,20 @@ def unity_service(self, topic, message_name):
self.tcp_server.source_destination_dict[topic].unregister()

self.tcp_server.source_destination_dict[topic] = UnityService(
topic.encode("ascii"), message_class, self.tcp_server
topic, message_class, self.tcp_server
)

def response(self, srv_id): # the next message is a service response
self.tcp_server.pending_srv_id = srv_id
self.tcp_server.pending_srv_is_request = False

def request(self, srv_id): # the next message is a service request
self.tcp_server.pending_srv_id = srv_id
self.tcp_server.pending_srv_is_request = True

def topic_list(self):
self.tcp_server.unity_tcp_sender.send_topic_list()


def resolve_message_name(name, extension="msg"):
try:
Expand Down
76 changes: 61 additions & 15 deletions src/ros_tcp_endpoint/tcp_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import struct
from .client import ClientThread
from .thread_pauser import ThreadPauser
from ros_tcp_endpoint.msg import RosUnityError
from ros_tcp_endpoint.msg import RosUnitySrvMessage
from io import BytesIO

# queue module was renamed between python 2 and 3
Expand All @@ -34,7 +32,7 @@

class UnityTcpSender:
"""
Connects and sends messages to the server on the Unity side.
Sends messages to Unity.
"""

def __init__(self):
Expand All @@ -51,29 +49,55 @@ def __init__(self):
self.srv_lock = threading.Lock()
self.services_waiting = {}

def send_unity_error(self, error):
self.send_unity_message("__error", RosUnityError(error))
def send_unity_info(self, text):
if self.queue is not None:
command = SysCommand_Log()
command.text = text
serialized_bytes = ClientThread.serialize_command("__log", command)
self.queue.put(serialized_bytes)

def send_unity_warning(self, text):
if self.queue is not None:
command = SysCommand_Log()
command.text = text
serialized_bytes = ClientThread.serialize_command("__warn", command)
self.queue.put(serialized_bytes)

def send_unity_error(self, text):
if self.queue is not None:
command = SysCommand_Log()
command.text = text
serialized_bytes = ClientThread.serialize_command("__error", command)
self.queue.put(serialized_bytes)

def send_ros_service_response(self, srv_id, destination, response):
if self.queue is not None:
command = SysCommand_Service()
command.srv_id = srv_id
serialized_bytes = ClientThread.serialize_command("__response", command)
self.queue.put(serialized_bytes)
self.send_unity_message(destination, response)

def send_unity_message(self, topic, message):
serialized_message = ClientThread.serialize_message(topic, message)
if self.queue is not None:
serialized_message = ClientThread.serialize_message(topic, message)
self.queue.put(serialized_message)

def send_unity_service(self, topic, service_class, request):
request_bytes = BytesIO()
request.serialize(request_bytes)
thread_pauser = ThreadPauser()
def send_unity_service_request(self, topic, service_class, request):
if self.queue is None:
return None

thread_pauser = ThreadPauser()
with self.srv_lock:
srv_id = self.next_srv_id
self.next_srv_id += 1
self.services_waiting[srv_id] = thread_pauser

payload = request_bytes.getvalue()
srv_message = RosUnitySrvMessage(srv_id, True, topic, payload)
serialized_message = ClientThread.serialize_message("__srv", srv_message)
if self.queue is not None:
self.queue.put(serialized_message)
command = SysCommand_Service()
command.srv_id = srv_id
serialized_bytes = ClientThread.serialize_command("__request", command)
self.queue.put(serialized_bytes)
self.send_unity_message(topic, request)

# rospy starts a new thread for each service request,
# so it won't break anything if we sleep now while waiting for the response
Expand All @@ -90,6 +114,15 @@ def send_unity_service_response(self, srv_id, data):

thread_pauser.resume_with_result(data)

def send_topic_list(self):
if self.queue is not None:
topic_list = SysCommand_TopicsResponse()
topics_and_types = rospy.get_published_topics()
topic_list.topics = [item[0] for item in topics_and_types]
topic_list.types = [item[1] for item in topics_and_types]
serialized_bytes = ClientThread.serialize_command("__topic_list", topic_list)
self.queue.put(serialized_bytes)

def start_sender(self, conn, halt_event):
sender_thread = threading.Thread(
target=self.sender_loop, args=(conn, self.sender_id, halt_event)
Expand Down Expand Up @@ -130,3 +163,16 @@ def sender_loop(self, conn, tid, halt_event):
with self.queue_lock:
if self.queue is local_queue:
self.queue = None


class SysCommand_Log:
text = ""


class SysCommand_Service:
srv_id = 0


class SysCommand_TopicsResponse:
topics = []
types = []
2 changes: 0 additions & 2 deletions srv/RosUnityTopicList.srv

This file was deleted.

Loading

0 comments on commit f978d94

Please sign in to comment.