diff --git a/src/cpp/fastdds/publisher/DataWriterHistory.cpp b/src/cpp/fastdds/publisher/DataWriterHistory.cpp index 19a2ae66483..a770b25deb4 100644 --- a/src/cpp/fastdds/publisher/DataWriterHistory.cpp +++ b/src/cpp/fastdds/publisher/DataWriterHistory.cpp @@ -151,6 +151,8 @@ bool DataWriterHistory::prepare_change( if (history_qos_.kind == KEEP_ALL_HISTORY_QOS) { ret = this->mp_writer->try_remove_change(max_blocking_time, lock); + // If change was removed (ret == 1) in KeepAllHistory, it must have been acked + is_acked = ret; } else if (history_qos_.kind == KEEP_LAST_HISTORY_QOS) { diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 26fff170ec1..6e65fbcc733 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -3428,6 +3428,96 @@ TEST(DDSStatus, entire_history_acked_volatile_unknown_pointer) } } +/*ยก + * Regression Test for 22648: on_unacknowledged_sample_removed callback is called when writer with keep all + * history is used, when the history was full but before max_blocking_time a sample was acknowledged, as is_acked was + * checked before the waiting time, and is not re-checked. This should not happen. + */ +TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call) +{ + auto test_transport = std::make_shared(); + test_transport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg) -> bool + { + static std::vector> delayed_messages; + + uint32_t old_pos = msg.pos; + + // see RTPS DDS 9.4.5.3 Data Submessage + EntityId_t writerID; + SequenceNumber_t sn; + + msg.pos += 2; // flags + msg.pos += 2; // inline QoS + msg.pos += 4; // reader ID + msg.pos += 4; + CDRMessage::readEntityId(&msg, &writerID); + CDRMessage::readSequenceNumber(&msg, &sn); + + // Restore buffer position + msg.pos = old_pos; + + // Delay logic for user endpoints only + if ((writerID.value[3] & 0xC0) == 0) // only user endpoints + { + auto now = std::chrono::steady_clock::now(); + auto it = std::find_if(delayed_messages.begin(), delayed_messages.end(), + [&sn](const auto& pair) + { + return pair.first == sn; + }); + + if (it == delayed_messages.end()) + { + // If the sequence number is encountered for the first time, start the delay + delayed_messages.emplace_back(sn, now + std::chrono::milliseconds(750)); // Add delay + return true; // Start dropping this message + } + else if (now < it->second) + { + // If the delay period has not elapsed, keep dropping the message + return true; + } + else + { + // Once the delay has elapsed, allow the message to proceed + delayed_messages.erase(it); + } + } + return false; // Allow message to proceed + }; + + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, eprosima::fastrtps::Duration_t (200, 0)) + .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) + .resource_limits_max_instances(1) + .resource_limits_max_samples(1) + .resource_limits_max_samples_per_instance(1) + .disable_builtin_transport() + .add_user_transport_to_pparams(test_transport) + .init(); + ASSERT_TRUE(writer.isInitialized()); + + reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .init(); + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_helloworld_data_generator(2); + + for (auto sample : data) + { + writer.send_sample(sample); + } + + EXPECT_EQ(writer.times_unack_sample_removed(), 0u); +} + /*! * Test that checks with a writer of each type that having the same listener attached, the notified writer in the * callback is the corresponding writer that has removed a sample unacknowledged.