Skip to content

Commit

Permalink
LARGE_DATA Participants logic with same listening ports (#4396)
Browse files Browse the repository at this point in the history
* Refs #20438: Add unittest to validate interfaces check

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20438: Add support to same listening_port and different IP address

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20438: Fix Windows build

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20438: Add Ipv6 addresses support

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20438: Remove repeated addition of listener port (added by default)

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20438: Apply suggestions

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20438: Apply suggestions: ipv6

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20438: Fix windows build

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

---------

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>
(cherry picked from commit 3ff9180)
  • Loading branch information
cferreiragonz committed Feb 28, 2024
1 parent 21be33e commit 7aff370
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 1 deletion.
23 changes: 22 additions & 1 deletion src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,10 +698,31 @@ bool TCPTransportInterface::OpenOutputChannel(
listening_port = config->listening_ports.front();
}

bool local_lower_interface = false;
if (IPLocator::getPhysicalPort(physical_locator) == listening_port)
{
std::vector<Locator> list;
std::vector<fastrtps::rtps::IPFinder::info_IP> local_interfaces;
get_ips(local_interfaces);
for (const auto& interface_it : local_interfaces)
{
Locator interface_loc(interface_it.locator);
interface_loc.port = physical_locator.port;
if (is_interface_allowed(interface_loc))
{
list.push_back(interface_loc);
}
}
if (!list.empty() && (list.front() < physical_locator))
{
local_lower_interface = true;
}
}

// If the remote physical port is higher than our listening port, a new CONNECT channel needs to be created and connected
// and the locator added to the send_resource_list.
// If the remote physical port is lower than our listening port, only the locator needs to be added to the send_resource_list.
if (IPLocator::getPhysicalPort(physical_locator) > listening_port)
if (IPLocator::getPhysicalPort(physical_locator) > listening_port || local_lower_interface)
{
// Client side (either Server-Client or LARGE_DATA)
logInfo(OpenOutputChannel, "OpenOutputChannel: [CONNECT] (physical: "
Expand Down
28 changes: 28 additions & 0 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1938,6 +1938,34 @@ TEST_F(TCPv4Tests, reconnect_after_open_port_failure)
client_resource_list.clear();
}

// This test verifies that OpenOutputChannel correctly handles a remote locator with
// same physical port as the local listening port.
TEST_F(TCPv4Tests, opening_output_channel_with_same_locator_as_local_listening_port)
{
TCPv4Transport transportUnderTest(descriptor);
transportUnderTest.init();

// Two locators with the same port as the local listening port, but different addresses
Locator_t lowerOutputChannelLocator;
lowerOutputChannelLocator.kind = LOCATOR_KIND_TCPv4;
lowerOutputChannelLocator.port = g_default_port;
IPLocator::setLogicalPort(lowerOutputChannelLocator, g_default_port);
Locator_t higherOutputChannelLocator = lowerOutputChannelLocator;
IPLocator::setIPv4(lowerOutputChannelLocator, 1, 1, 1, 1);
IPLocator::setIPv4(higherOutputChannelLocator, 255, 255, 255, 255);

SendResourceList send_resource_list;

// If the remote address is lower than the local one, no channel must be created but it must be added to the send_resource_list
ASSERT_TRUE(transportUnderTest.OpenOutputChannel(send_resource_list, lowerOutputChannelLocator));
ASSERT_FALSE(transportUnderTest.is_output_channel_open_for(lowerOutputChannelLocator));
ASSERT_EQ(send_resource_list.size(), 1);
// If the remote address is higher than the local one, a CONNECT channel must be created and added to the send_resource_list
ASSERT_TRUE(transportUnderTest.OpenOutputChannel(send_resource_list, higherOutputChannelLocator));
ASSERT_TRUE(transportUnderTest.is_output_channel_open_for(higherOutputChannelLocator));
ASSERT_EQ(send_resource_list.size(), 2);
}

void TCPv4Tests::HELPER_SetDescriptorDefaults()
{
descriptor.add_listener_port(g_default_port);
Expand Down
91 changes: 91 additions & 0 deletions test/unittest/transport/TCPv6Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,70 @@ TEST_F(TCPv6Tests, autofill_port)
EXPECT_TRUE(transportUnderTest_multiple_autofill.configuration()->listening_ports.size() == 3);
}

static void GetIP6s(
std::vector<IPFinder::info_IP>& interfaces)
{
IPFinder::getIPs(&interfaces, false);
auto new_end = remove_if(interfaces.begin(),
interfaces.end(),
[](IPFinder::info_IP ip)
{
return ip.type != IPFinder::IP6 && ip.type != IPFinder::IP6_LOCAL;
});
interfaces.erase(new_end, interfaces.end());
std::for_each(interfaces.begin(), interfaces.end(), [](IPFinder::info_IP& loc)
{
loc.locator.kind = LOCATOR_KIND_TCPv6;
});
}

TEST_F(TCPv6Tests, check_TCPv6_interface_whitelist_initialization)
{
std::vector<IPFinder::info_IP> interfaces;

GetIP6s(interfaces);

// asio::ip::addres_v6 appends the interface name to the IP address, but the locator does not
// Create two different vectors to compare them
std::vector<std::string> asio_interfaces;
std::vector<std::string> locator_interfaces;
for (auto& ip : interfaces)
{
asio_interfaces.push_back(ip.name);
locator_interfaces.push_back(IPLocator::toIPv6string(ip.locator));
}
// Add manually localhost to test adding multiple interfaces
asio_interfaces.push_back("::1");
locator_interfaces.push_back("::1");

for (auto& ip : locator_interfaces)
{
descriptor.interfaceWhiteList.emplace_back(ip);
}
descriptor.add_listener_port(g_default_port);
MockTCPv6Transport transportUnderTest(descriptor);
transportUnderTest.init();

// Check that the transport whitelist and the acceptors map is the same size as the locator_interfaces
ASSERT_EQ(transportUnderTest.get_interface_whitelist().size(), descriptor.interfaceWhiteList.size());
ASSERT_EQ(transportUnderTest.get_acceptors_map().size(), descriptor.interfaceWhiteList.size());

// Check that every interface is in the whitelist
auto check_whitelist = transportUnderTest.get_interface_whitelist();
for (auto& ip : asio_interfaces)
{
ASSERT_NE(std::find(check_whitelist.begin(), check_whitelist.end(), asio::ip::address_v6::from_string(
ip)), check_whitelist.end());
}

// Check that every interface is in the acceptors map
for (const auto& test : transportUnderTest.get_acceptors_map())
{
ASSERT_NE(std::find(locator_interfaces.begin(), locator_interfaces.end(), IPLocator::toIPv6string(
test.first)), locator_interfaces.end());
}
}

// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators.
// Clients typically communicated its PID as its locator port. When having several clients in the same
// process this lead to overwriting server's channel resources map elements.
Expand Down Expand Up @@ -412,6 +476,33 @@ TEST_F(TCPv6Tests, reconnect_after_open_port_failure)
client_resource_list.clear();
}

TEST_F(TCPv6Tests, opening_output_channel_with_same_locator_as_local_listening_port)
{
descriptor.add_listener_port(g_default_port);
TCPv6Transport transportUnderTest(descriptor);
transportUnderTest.init();

// Two locators with the same port as the local listening port, but different addresses
Locator_t lowerOutputChannelLocator;
lowerOutputChannelLocator.kind = LOCATOR_KIND_TCPv6;
lowerOutputChannelLocator.port = g_default_port;
IPLocator::setLogicalPort(lowerOutputChannelLocator, g_default_port);
Locator_t higherOutputChannelLocator = lowerOutputChannelLocator;
IPLocator::setIPv6(lowerOutputChannelLocator, "::");
IPLocator::setIPv6(higherOutputChannelLocator, "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff");

SendResourceList send_resource_list;

// If the remote address is lower than the local one, no channel must be created but it must be added to the send_resource_list
ASSERT_TRUE(transportUnderTest.OpenOutputChannel(send_resource_list, lowerOutputChannelLocator));
ASSERT_FALSE(transportUnderTest.is_output_channel_open_for(lowerOutputChannelLocator));
ASSERT_EQ(send_resource_list.size(), 1);
// If the remote address is higher than the local one, a CONNECT channel must be created and added to the send_resource_list
ASSERT_TRUE(transportUnderTest.OpenOutputChannel(send_resource_list, higherOutputChannelLocator));
ASSERT_TRUE(transportUnderTest.is_output_channel_open_for(higherOutputChannelLocator));
ASSERT_EQ(send_resource_list.size(), 2);
}

/*
TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports)
{
Expand Down

0 comments on commit 7aff370

Please sign in to comment.