Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21519] Fix issue with exclusive ownership and unordered samples (backport #5182) #5216

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -830,13 +830,39 @@ bool DataReaderHistory::update_instance_nts(

assert(vit != instances_.end());
assert(false == change->isRead);
auto previous_owner = vit->second->current_owner.first;
++counters_.samples_unread;
bool ret =
vit->second->update_state(counters_, change->kind, change->writerGUID,
change->reader_info.writer_ownership_strength);
change->reader_info.disposed_generation_count = vit->second->disposed_generation_count;
change->reader_info.no_writers_generation_count = vit->second->no_writers_generation_count;

auto current_owner = vit->second->current_owner.first;
if (current_owner != previous_owner)
{
assert(current_owner == change->writerGUID);

// Remove all changes from different owners after the change.
DataReaderInstance::ChangeCollection& changes = vit->second->cache_changes;
auto it = std::lower_bound(changes.begin(), changes.end(), change, rtps::history_order_cmp);
assert(it != changes.end());
assert(*it == change);
++it;
while (it != changes.end())
{
if ((*it)->writerGUID != current_owner)
{
// Remove from history
remove_change_sub(*it, it);

// Current iterator will point to change next to the one removed. Avoid incrementing.
continue;
}
++it;
}
}

return ret;
}

Expand Down
136 changes: 136 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,142 @@ TEST_P(OwnershipQos, exclusive_kind_keyed_besteffort_disposing_instance)
exclusive_kind_keyed_disposing_instance(false);
}

/*!
* This is a regression test for redmine issue 20866.
*
* This test checks that a reader keeping a long number of samples and with an exclusive ownership policy only
* returns the data from the writer with the highest strength.
*
* @param use_keep_all_history Whether to use KEEP_ALL history or KEEP_LAST(20).
* @param mixed_data Whether to send data from both writers in an interleaved way.
*/
static void test_exclusive_kind_big_history(
bool use_keep_all_history,
bool mixed_data)
{
PubSubReader<KeyedHelloWorldPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<KeyedHelloWorldPubSubType> low_strength_writer(TEST_TOPIC_NAME);
PubSubWriter<KeyedHelloWorldPubSubType> high_strength_writer(TEST_TOPIC_NAME);

// Configure history QoS.
if (use_keep_all_history)
{
reader.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS);
low_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS);
high_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS);
}
else
{
reader.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20);
low_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20);
high_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20);
}

// Prepare data.
std::list<KeyedHelloWorld> generated_data = default_keyedhelloworld_data_generator(20);
auto middle = std::next(generated_data.begin(), 10);
std::list<KeyedHelloWorld> low_strength_data(generated_data.begin(), middle);
std::list<KeyedHelloWorld> high_strength_data(middle, generated_data.end());
auto expected_data = high_strength_data;

if (mixed_data)
{
// Expect reception of the first two samples from the low strength writer (one per instance).
auto it = low_strength_data.begin();
expected_data.push_front(*it++);
expected_data.push_front(*it);
}

// Initialize writers.
low_strength_writer.ownership_strength(3)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.init();
ASSERT_TRUE(low_strength_writer.isInitialized());

// High strength writer will use a custom transport to ensure its data is received after the low strength data.
auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
std::atomic<bool> drop_messages(false);
test_transport->messages_filter_ = [&drop_messages](eprosima::fastrtps::rtps::CDRMessage_t&)
{
return drop_messages.load();
};
high_strength_writer.ownership_strength(4)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.disable_builtin_transport()
.add_user_transport_to_pparams(test_transport)
.init();
ASSERT_TRUE(high_strength_writer.isInitialized());

// Initialize reader.
reader.ownership_exclusive()
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.init();
ASSERT_TRUE(reader.isInitialized());

// Wait for discovery.
low_strength_writer.wait_discovery();
high_strength_writer.wait_discovery();
reader.wait_discovery(std::chrono::seconds::zero(), 2);

// Drop the messages from the high strength writer, so they arrive later to the reader.
drop_messages.store(true);

if (mixed_data)
{
// Send one sample from each writer, with low strength data first.
while (!low_strength_data.empty() && !high_strength_data.empty())
{
EXPECT_TRUE(low_strength_writer.send_sample(low_strength_data.front()));
EXPECT_TRUE(high_strength_writer.send_sample(high_strength_data.front()));
low_strength_data.pop_front();
high_strength_data.pop_front();
}
}
else
{
// Send high strength data first, so it has the lowest source timestamps, but drop the messages, so they arrive
// later to the reader.
high_strength_writer.send(high_strength_data);
EXPECT_TRUE(high_strength_data.empty());

// Send low strength data, so it has the highest source timestamps.
low_strength_writer.send(low_strength_data);
EXPECT_TRUE(low_strength_data.empty());
}

// Wait for the reader to receive the low strength data.
EXPECT_TRUE(low_strength_writer.waitForAllAcked(std::chrono::seconds(1)));

// Let high strength writer send the data, and wait for the reader to receive it.
drop_messages.store(false);
EXPECT_TRUE(high_strength_writer.waitForAllAcked(std::chrono::seconds(1)));

// Make the reader process the data, expecting only the required data.
// The issue was reproduced by the reader complaining about reception of unexpected data.
reader.startReception(expected_data);
reader.block_for_all();
}

TEST(OwnershipQos, exclusive_kind_keep_all_reliable)
{
test_exclusive_kind_big_history(true, false);
}

TEST(OwnershipQos, exclusive_kind_keep_all_reliable_mixed)
{
test_exclusive_kind_big_history(true, true);
}

TEST(OwnershipQos, exclusive_kind_keep_last_reliable)
{
test_exclusive_kind_big_history(false, false);
}

TEST(OwnershipQos, exclusive_kind_keep_last_reliable_mixed)
{
test_exclusive_kind_big_history(false, true);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
Loading
Loading