Skip to content

Commit

Permalink
new additional check for CSubscriber::GetPublisherCount using the con…
Browse files Browse the repository at this point in the history
…nection state of the matching subscriber (should ensure that GetPublisherCount is increased only if connected publisher is able to send data)

test added to test new behavior
  • Loading branch information
rex-schilasky committed Jun 20, 2024
1 parent 0221d4e commit 7221663
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 6 deletions.
18 changes: 13 additions & 5 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -281,8 +281,12 @@ namespace eCAL

iter->second->ApplyLocLayerParameter(process_id, topic_id, tlayer.type(), writer_par);
}
// inform for local publisher connection
iter->second->ApplyLocPublication(process_id, topic_id, topic_info);
// we only inform the subscriber when the publisher has already recognized at least one local subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
if (ecal_sample_.topic().connections_loc() > 0)
{
iter->second->ApplyLocPublication(process_id, topic_id, topic_info);
}
}
}

Expand Down Expand Up @@ -331,8 +335,12 @@ namespace eCAL
const std::string writer_par = tlayer.par_layer().SerializeAsString();
iter->second->ApplyExtLayerParameter(host_name, tlayer.type(), writer_par);
}
// inform for external publisher connection
iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info);
// we only inform the subscriber when the publisher has already recognized at least one external subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
if (ecal_sample_.topic().connections_ext() > 0)
{
iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info);
}
}
}

Expand Down
96 changes: 95 additions & 1 deletion testing/ecal/pubsub_test/src/pubsub_receive_test.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -219,3 +219,97 @@ TEST(SUBSCRIBER, SporadicEmptyReceives)
// finalize eCAL API
EXPECT_EQ(0, eCAL::Finalize());
}

TEST(PubSub, TestSubscriberSeen)
{
// initialize eCAL API
EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen"));

// enable data loopback
eCAL::Util::EnableLoopback(true);

std::atomic<bool> subscriber_seen_at_publication_start(false);
std::atomic<bool> subscriber_seen_at_publication_end(false);

std::atomic<bool> do_start_publication(false);
std::atomic<bool> publication_finished(false);

// publishing thread
auto publisher_thread = [&]() {
eCAL::CPublisher pub("blob");
pub.ShmSetAcknowledgeTimeout(500);

int cnt(0);
const auto max_runs(1000);
while (eCAL::Ok())
{
if (do_start_publication && cnt < max_runs)
{
if (cnt == 0)
{
subscriber_seen_at_publication_start = pub.IsSubscribed();
}

pub.Send(std::to_string(cnt));
cnt++;

if (cnt == max_runs)
{
subscriber_seen_at_publication_end = pub.IsSubscribed();
publication_finished = true;
break;
}
}
}
};

// subscribing thread
auto subscriber_thread = [&]() {
eCAL::CSubscriber sub("blob");
bool received(false);
auto max_lines(10);
auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_)
{
if (max_lines != 0)
{
// the final log should look like this
// -----------------------------------
// Receiving 0
// Receiving 1
// Receiving 2
// Receiving 3
// Receiving 4
// Receiving 5
// Receiving 6
// Receiving 7
// Receiving 8
// Receiving 9
// -----------------------------------
std::cout << "Receiving " << std::string(static_cast<const char*>(data_->buf), data_->size) << std::endl;
max_lines--;
}
};
sub.AddReceiveCallback(receive_lambda);

while (eCAL::Ok() && !publication_finished)
{
//if (sub.IsPublished()) do_start_publication = true;
if (sub.GetPublisherCount() > 0) do_start_publication = true;
}
};

// create threads for publisher and subscriber
std::thread pub_thread(publisher_thread);
std::thread sub_thread(subscriber_thread);

// join threads to the main thread
pub_thread.join();
sub_thread.join();

// finalize eCAL API
eCAL::Finalize();

// check if the publisher has seen the subscriber
EXPECT_TRUE(subscriber_seen_at_publication_start);
EXPECT_TRUE(subscriber_seen_at_publication_end);
}

0 comments on commit 7221663

Please sign in to comment.