Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FastDDS High Latency using Large Data #212

Open
andreclerigo opened this issue Feb 26, 2025 · 0 comments
Open

FastDDS High Latency using Large Data #212

andreclerigo opened this issue Feb 26, 2025 · 0 comments

Comments

@andreclerigo
Copy link

When using FastDDS to transfer images (~4MB) between participants, the achieved latency results (~60ms) are worse when compared to other tools like Zenoh or ZeroMQ (~40/50 ms). This should not happen since FastDDS leverages SHM and others don't.

Setup

I am using FastDDS docker images, something similar to #134.
I am using eProsima_Fast-DDS-v2.13.3-Linux.tgz and https://raw.githubusercontent.com/eProsima/Fast-DDS-python/main/fastdds_python.repos as the base resources for the docker image.
I deploy the containers as such:
docker run -d --ipc shareable --name ipc_share_container alpine sleep infinity
docker run -itd --network host --ipc container:ipc_share_container --name app1
docker run -itd --network host --ipc container:ipc_share_container --name app2

Participants

Using the following XML file

<?xml version="1.0" encoding="utf-8" ?>
<profiles xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">
    <!-- Descriptors for new transports -->
    <transport_descriptors>
        <!-- UDP new transport -->
        <transport_descriptor>
            <transport_id>standard_udp_transport</transport_id>
            <type>UDPv4</type>
            <TTL>250</TTL>
        </transport_descriptor>
        <!-- Create a descriptor for the new transport -->
        <transport_descriptor>
            <transport_id>shm_transport</transport_id>
            <type>SHM</type> <!-- REQUIRED -->
            <maxMessageSize>524288</maxMessageSize> <!-- OPTIONAL uint32 valid of all transports-->
            <segment_size>1048576</segment_size> <!-- OPTIONAL uint32 SHM only-->
            <port_queue_capacity>1024</port_queue_capacity> <!-- OPTIONAL uint32 SHM only-->
            <healthy_check_timeout_ms>250</healthy_check_timeout_ms> <!-- OPTIONAL uint32 SHM only-->
            <default_reception_threads> <!-- OPTIONAL -->
                <scheduling_policy>-1</scheduling_policy>
                <priority>0</priority>
                <affinity>0</affinity>
                <stack_size>-1</stack_size>
            </default_reception_threads>
            <reception_threads> <!-- OPTIONAL -->
                <reception_thread port="12345">
                    <scheduling_policy>-1</scheduling_policy>
                    <priority>0</priority>
                    <affinity>0</affinity>
                    <stack_size>-1</stack_size>
                </reception_thread>
            </reception_threads>
        </transport_descriptor>
    </transport_descriptors>

    <participant profile_name="large_data_builtin_transports_options" is_default_profile="true">
        <rtps>
            <sendSocketBufferSize>1048576</sendSocketBufferSize>
            <listenSocketBufferSize>4194304</listenSocketBufferSize>
            <builtinTransports max_msg_size="310KB" sockets_size="310KB" non_blocking="true" tcp_negotiation_timeout="50">LARGE_DATA</builtinTransports>
        </rtps>
    </participant>

    <participant profile_name="video_publisher_qos">
        <rtps>
            <!-- Link the Transport Layer to the Participant -->
            <userTransports>
                <transport_id>shm_transport</transport_id>
                <transport_id>standard_udp_transport</transport_id>
            </userTransports>
            <useBuiltinTransports>false</useBuiltinTransports>
        </rtps>
    </participant>
</profiles>

We have tested participants using either the large_data_builtin_transports_options and the video_publisher_qos profiles, and results were about the same.

Sample code for the publisher:

class Writer:
    def __init__(self, profile: str, data_name: str, topic_name: str) -> None:
        self._matched_reader = 0
        self._cvDiscovery = Condition()
        self.participant_handles = fastdds.InstanceHandleVector()  # Store handles here

        factory = fastdds.DomainParticipantFactory.get_instance()
        self.participant_qos = fastdds.DomainParticipantQos()
        factory.get_default_participant_qos(self.participant_qos)
        participant = factory.create_participant_with_profile(profile)
        self.participant = participant
        
        # Register the DDS data type
        if data_name == "SensorMessage":
            self.topic_data_type = SensorMessage.SensorMessagePubSubType()
        else:
            raise ValueError("Invalid data name")
        
        self.topic_data_type.setName(data_name)
        self.type_support = fastdds.TypeSupport(self.topic_data_type)
        self.participant.register_type(self.type_support)
        self.topic_qos = fastdds.TopicQos()
        self.participant.get_default_topic_qos(self.topic_qos)
        self.topic = self.participant.create_topic(topic_name, self.topic_data_type.getName(), self.topic_qos)
        self.publisher_qos = fastdds.PublisherQos()
        self.participant.get_default_publisher_qos(self.publisher_qos)
        self.publisher = self.participant.create_publisher(self.publisher_qos)
        self.listener = WriterListener(self)
        self.writer_qos = fastdds.DataWriterQos()
        self.publisher.get_default_datawriter_qos(self.writer_qos)

        self.writer = self.publisher.create_datawriter(self.topic, self.writer_qos, self.listener)
...

In main code:

def write(self, data: SensorFormat):
        super().write()

        object: SensorMessage.SensorMessage = SensorMsgOperations.BuildSensorMessage(data)
        self.writer.write(object)

SensorMsgOperations uses this IDL File

struct SensorMessage 
{
    string serializedObjectTypeDef;
    long serializedObjectID;
    sequence<octet> serializedObject;
    string uuid;
};

and is built like this:

fastddsgen -python SensorMessage.idl

if [ -d "build" ]; then
    cd build
else
    mkdir build && cd build
fi

cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local/ -DBUILD_SHARED_LIBS=ON .
cmake --build .
cd ..

Sample code for the subscriber:

class Reader:
    def __init__(self, profile: str, data_name: str, topic_name: str, listener: Optional[ReaderListener] = None) -> None:
        factory = fastdds.DomainParticipantFactory.get_instance()
        self.participant_qos = fastdds.DomainParticipantQos()
        factory.get_default_participant_qos(self.participant_qos)
        participant = factory.create_participant_with_profile(profile)
        self.participant = participant

        # Register the DDS data type
        if data_name == "SensorMessage":
            self.topic_data_type = SensorMessage.SensorMessagePubSubType()
        else:
            raise ValueError("DDS Reader data name specified not supported")
        
        self.topic_data_type.setName(data_name)
        self.type_support = fastdds.TypeSupport(self.topic_data_type)
        self.participant.register_type(self.type_support)

        # Create a DDS topic
        self.topic_qos = fastdds.TopicQos()
        self.participant.get_default_topic_qos(self.topic_qos)
        if data_name == "SensorMessage":
            self.topic = self.participant.create_topic(topic_name, self.topic_data_type.getName(), self.topic_qos)
        else:
            raise ValueError("DDS Reader data name specified not supported")

        self.subscriber_qos = fastdds.SubscriberQos()
        self.participant.get_default_subscriber_qos(self.subscriber_qos)
        self.subscriber = self.participant.create_subscriber(self.subscriber_qos)

        # Set a custom listener if provided
        if listener is None:
            self.listener = ReaderListener()
        else:
            self.listener = listener
        
        self.reader_qos = fastdds.DataReaderQos()
        self.subscriber.get_default_datareader_qos(self.reader_qos)
        self.reader = self.subscriber.create_datareader(self.topic, self.reader_qos, self.listener)
...

In the main code:

    def on_data_available(self, reader):
        super().on_data_available(reader)

        info = fastdds.SampleInfo()
        data = SensorMessage.SensorMessage()
        reader.take_next_sample(data, info)

        # Convert to SensorFormat
        sensor_data = SensorMsgOperations.ReadSensorMessage(data)

Is there any kind of flag missing, or is there an issue when using FastDDS Python inside Docker containers?
I would assume that these values are not normal, and I should be achieving much better latencies.

Thanks in advance for the help 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant