From 9e69e26d0494c9cbc7661913fb291a26144f1903 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Dom=C3=ADnguez=20L=C3=B3pez?= <116071334+Mario-DL@users.noreply.github.com> Date: Wed, 4 Oct 2023 15:17:41 +0200 Subject: [PATCH] Notify datasharing listener also when intraprocess (#3875) * Refs #19570: Regression Test Signed-off-by: Mario Dominguez * Refs #19570: Notify datasharing listener also when intraprocess Signed-off-by: Mario Dominguez * Refs #19570: Address reviewer changes Signed-off-by: Mario Dominguez --------- Signed-off-by: Mario Dominguez --- src/cpp/rtps/reader/StatefulReader.cpp | 2 +- .../common/DDSBlackboxTestsDataReader.cpp | 67 +++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index c1f9f6b34d5..6a4e4b72378 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -311,7 +311,7 @@ bool StatefulReader::matched_writer_add( wp->lost_changes_update(last_seq + 1); } } - else if (!is_same_process) + else { // simulate a notification to force reading of transient changes datasharing_listener_->notify(false); diff --git a/test/blackbox/common/DDSBlackboxTestsDataReader.cpp b/test/blackbox/common/DDSBlackboxTestsDataReader.cpp index 2d092f059cc..ab4ce661d3f 100644 --- a/test/blackbox/common/DDSBlackboxTestsDataReader.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDataReader.cpp @@ -228,6 +228,73 @@ TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo) ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count(); } +//! Regression test for Issues #3822 Github #3875 +//! This test needs to late join a reader in the same process. +//! Not setting this test as parametrized since it only makes sense in intraprocess. +//! Note: Without the fix, the test fails ~1/10 times, it is encouraged to launch +//! the test with --retest-until-fail 50 +TEST(DDSDataReader, ConsistentReliabilityWhenIntraprocess) +{ + //! Manually set intraprocess + LibrarySettingsAttributes library_settings; + library_settings.intraprocess_delivery = eprosima::fastrtps::INTRAPROCESS_FULL; + xmlparser::XMLProfileManager::library_settings(library_settings); + + auto participant = DomainParticipantFactory::get_instance()->create_participant( + (uint32_t)GET_PID() % 230, + DomainParticipantFactory::get_instance()->get_default_participant_qos(), nullptr, + eprosima::fastdds::dds::StatusMask::none()); + + eprosima::fastdds::dds::TypeSupport t_type{ new HelloWorldPubSubType() }; + ASSERT_TRUE(t_type.register_type( participant ) == ReturnCode_t::RETCODE_OK); + + auto topic = participant->create_topic( TEST_TOPIC_NAME, t_type.get_type_name(), + participant->get_default_topic_qos()); + + // create publisher and writer + auto publisher = participant->create_publisher( participant->get_default_publisher_qos()); + + auto writer_qos = eprosima::fastdds::dds::DATAWRITER_QOS_DEFAULT; + writer_qos.durability().kind = DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS; + writer_qos.reliability().kind = ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS; + auto writer = publisher->create_datawriter( topic, writer_qos ); + + auto data = HelloWorld{}; + ASSERT_TRUE(writer->write( &data )); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + // create a late joiner subscriber and reader + auto subscriber = participant->create_subscriber( participant->get_default_subscriber_qos()); + auto reader_qos = eprosima::fastdds::dds::DATAREADER_QOS_DEFAULT; + reader_qos.durability().kind = DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS; + reader_qos.reliability().kind = ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS; + auto reader = subscriber->create_datareader( topic, reader_qos ); + + eprosima::fastdds::dds::SubscriptionMatchedStatus status; + reader->get_subscription_matched_status(status); + ASSERT_GT(status.total_count, 0); + + // wait for message + uint64_t unread_count = 0; + auto t0 = std::chrono::steady_clock::now(); + while (unread_count <= 0 && + (std::chrono::duration_cast(std::chrono::steady_clock::now() - t0)).count() < 2) + { + unread_count = reader->get_unread_count(true); + std::this_thread::sleep_for( std::chrono::milliseconds( 100 )); + } + + participant->delete_contained_entities(); + DomainParticipantFactory::get_instance()->delete_participant(participant); + + ASSERT_TRUE(unread_count > 0); + + //! Reset back to INTRAPROCESS_OFF + library_settings.intraprocess_delivery = eprosima::fastrtps::INTRAPROCESS_OFF; + xmlparser::XMLProfileManager::library_settings(library_settings); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else