Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[19576] Updatable disable_positive_acks period #3879

Merged
merged 9 commits into from
Oct 2, 2023
1 change: 1 addition & 0 deletions include/fastdds/dds/core/policy/QosPolicies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2264,6 +2264,7 @@ class DisablePositiveACKsQosPolicy : public Parameter_t, public QosPolicy
const DisablePositiveACKsQosPolicy& b) const
{
return enabled == b.enabled &&
duration == b.duration &&
Parameter_t::operator ==(b) &&
QosPolicy::operator ==(b);
}
Expand Down
3 changes: 2 additions & 1 deletion include/fastdds/dds/publisher/qos/DataWriterQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class RTPSReliableWriterQos
const RTPSReliableWriterQos& b) const
{
return (this->times == b.times) &&
(this->disable_positive_acks == b.disable_positive_acks);
(this->disable_positive_acks == b.disable_positive_acks) &&
(this->disable_heartbeat_piggyback == b.disable_heartbeat_piggyback);
}

//!Writer Timing Attributes
Expand Down
7 changes: 7 additions & 0 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,13 @@ class StatefulWriter : public RTPSWriter
void updateTimes(
const WriterTimes& times);

/**
* Update the period of the disable positive ACKs policy.
* @param att WriterAttributes parameter.
*/
void updatePositiveAcks(
const WriterAttributes& att);

SequenceNumber_t next_sequence_number() const;

/**
Expand Down
21 changes: 20 additions & 1 deletion src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1143,10 +1143,22 @@ ReturnCode_t DataWriterImpl::set_qos(
{
return ReturnCode_t::RETCODE_IMMUTABLE_POLICY;
}
set_qos(qos_, qos_to_set, !enabled);

set_qos(qos_, qos_to_set, enabled);

if (enabled)
{
if (qos_.reliability().kind == eprosima::fastrtps::RELIABLE_RELIABILITY_QOS &&
qos_.reliable_writer_qos() == qos_to_set.reliable_writer_qos())
{
// Update times and positive_acks attributes on RTPS Layer
WriterAttributes w_att;
w_att.times = qos_.reliable_writer_qos().times;
w_att.disable_positive_acks = qos_.reliable_writer_qos().disable_positive_acks.enabled;
w_att.keep_duration = qos_.reliable_writer_qos().disable_positive_acks.duration;
writer_->updateAttributes(w_att);
}

//Notify the participant that a Writer has changed its QOS
fastrtps::TopicAttributes topic_att = get_topic_attributes(qos_, *topic_, type_);
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
Expand Down Expand Up @@ -1884,6 +1896,13 @@ bool DataWriterImpl::can_qos_be_updated(
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Data sharing configuration cannot be changed after the creation of a DataWriter.");
}
if (to.reliable_writer_qos().disable_positive_acks.enabled !=
from.reliable_writer_qos().disable_positive_acks.enabled)
{
updatable = false;
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Only the period of Positive ACKs can be changed after the creation of a DataWriter.");
}
return updatable;
}

Expand Down
7 changes: 7 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,13 @@ bool DataReaderImpl::can_qos_be_updated(
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Unique network flows request cannot be changed after the creation of a DataReader.");
}
if (to.reliable_reader_qos().disable_positive_ACKs.enabled !=
from.reliable_reader_qos().disable_positive_ACKs.enabled)
{
updatable = false;
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Positive ACKs QoS cannot be changed after the creation of a DataReader.");
}
return updatable;
}

Expand Down
51 changes: 47 additions & 4 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace rtps {
/**
* Loops over all the readers in the vector, applying the given routine.
* The loop continues until the result of the routine is true for any reader
* or all readers have been processes.
* or all readers have been processed.
* The returned value is true if the routine returned true at any point,
* or false otherwise.
*/
Expand Down Expand Up @@ -953,6 +953,17 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
if (disable_positive_acks_ && last_sequence_number_ == SequenceNumber_t())
{
last_sequence_number_ = change->sequenceNumber;
if ( !(ack_event_->getRemainingTimeMilliSec() > 0))
{
// Restart ack_timer
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->restart_timer(max_blocking_time);
}
}

// Restore in case a exception was launched by RTPSMessageGroup.
Expand Down Expand Up @@ -1612,6 +1623,24 @@ void StatefulWriter::updateAttributes(
const WriterAttributes& att)
{
this->updateTimes(att.times);
if (this->get_disable_positive_acks())
{
this->updatePositiveAcks(att);
}
}

void StatefulWriter::updatePositiveAcks(
const WriterAttributes& att)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3))
{
// Implicit conversion to microseconds
keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()};
Mario-DL marked this conversation as resolved.
Show resolved Hide resolved
}
// Restart ack timer with new duration
ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3);
ack_event_->restart_timer();
}

void StatefulWriter::updateTimes(
Expand Down Expand Up @@ -2030,26 +2059,40 @@ bool StatefulWriter::ack_timer_expired()

while (interval.count() < 0)
{
bool acks_flag = false;
for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
[this](ReaderProxy* reader)
[this, &acks_flag](ReaderProxy* reader)
{
if (reader->disable_positive_acks())
{
reader->acked_changes_set(last_sequence_number_ + 1);
acks_flag = true;
}
return false;
}
);
last_sequence_number_++;
if (acks_flag)
{
check_acked_status();
}

// Get the next cache change from the history
CacheChange_t* change;

// Skip removed changes until reaching the last change
do
{
last_sequence_number_++;
} while (!mp_history->get_change(
last_sequence_number_,
getGuid(),
&change) && last_sequence_number_ < next_sequence_number());
Mario-DL marked this conversation as resolved.
Show resolved Hide resolved

if (!mp_history->get_change(
last_sequence_number_,
getGuid(),
&change))
{
// Stop ack_timer
return false;
}

Expand Down
11 changes: 11 additions & 0 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,17 @@ class PubSubWriter
return (ReturnCode_t::RETCODE_OK == datawriter_->set_qos(datawriter_qos_));
}

bool set_qos(
const eprosima::fastdds::dds::DataWriterQos& att)
{
return (ReturnCode_t::RETCODE_OK == datawriter_->set_qos(att));
}

eprosima::fastdds::dds::DataWriterQos get_qos()
{
return (datawriter_->get_qos());
}

bool remove_all_changes(
size_t* number_of_changes_removed)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,23 +12,138 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "BlackboxTests.hpp"

#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"
#include "ReqRepAsReliableHelloWorldRequester.hpp"
#include "ReqRepAsReliableHelloWorldReplier.hpp"
#include <atomic>
#include <condition_variable>
#include <gmock/gmock-matchers.h>
#include <mutex>
#include <thread>

#include <gtest/gtest.h>

#include <gmock/gmock.h>

#include <fastdds/core/policy/ParameterSerializer.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantFactoryQos.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
#include <fastdds/dds/publisher/qos/PublisherQos.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/dds/subscriber/qos/SubscriberQos.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/topic/qos/TopicQos.hpp>
#include <fastdds/dds/topic/Topic.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/rtps/participant/ParticipantDiscoveryInfo.h>
#include <fastrtps/utils/TimeConversion.h>
#include <fastrtps/transport/test_UDPv4TransportDescriptor.h>
#include <rtps/transport/test_UDPv4Transport.h>
#include <fastrtps/types/TypesBase.h>

#include "BlackboxTests.hpp"
#include "../api/dds-pim/CustomPayloadPool.hpp"
#include "../api/dds-pim/PubSubReader.hpp"
#include "../api/dds-pim/PubSubWriter.hpp"
#include "../api/dds-pim/ReqRepAsReliableHelloWorldRequester.hpp"
#include "../api/dds-pim/ReqRepAsReliableHelloWorldReplier.hpp"
#include "../types/FixedSized.h"
#include "../types/FixedSizedPubSubTypes.h"
#include "../types/HelloWorldPubSubTypes.h"

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using test_UDPv4Transport = eprosima::fastdds::rtps::test_UDPv4Transport;
using test_UDPv4TransportDescriptor = eprosima::fastdds::rtps::test_UDPv4TransportDescriptor;


TEST(AcknackQos, DDSEnableUpdatabilityOfPositiveAcksPeriodDDSLayer)
{
Mario-DL marked this conversation as resolved.
Show resolved Hide resolved
// This test checks the behaviour of disabling positive ACKs.
// It also checks that only the positive ACKs
// period is updatable at runtime through set_qos.

PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

// Configure datapublisher_qos
writer.keep_duration({1, 0});
writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS);
writer.durability_kind(eprosima::fastrtps::VOLATILE_DURABILITY_QOS);
writer.init();

ASSERT_TRUE(writer.isInitialized());

// Configure datasubscriber_qos
reader.keep_duration({1, 0});
reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS);
reader.init();

ASSERT_TRUE(reader.isInitialized());

// Check correct initialitation
eprosima::fastdds::dds::DataWriterQos get_att = writer.get_qos();
EXPECT_TRUE(get_att.reliable_writer_qos().disable_positive_acks.enabled);
EXPECT_EQ(get_att.reliable_writer_qos().disable_positive_acks.duration, eprosima::fastrtps::Duration_t({1, 0}));

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_helloworld_data_generator();

reader.startReception(data);
// Send data
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_all();
// Wait for all acked msgs
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::milliseconds(1200)));

// Wait to disable timer because no new messages are sent
std::this_thread::sleep_for(std::chrono::milliseconds(1200));
// Send a new message to check that timer is restarted correctly
data = default_helloworld_data_generator(1);
reader.startReception(data);
writer.send(data);
ASSERT_TRUE(data.empty());
reader.block_for_all();
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::milliseconds(1200)));

// Update attributes on DDS layer
eprosima::fastdds::dds::DataWriterQos w_att = writer.get_qos();
w_att.reliable_writer_qos().disable_positive_acks.enabled = true;
w_att.reliable_writer_qos().disable_positive_acks.duration = eprosima::fastrtps::Duration_t({2, 0});

EXPECT_TRUE(writer.set_qos(w_att));

// Check that period has been changed in DataWriterQos
get_att = writer.get_qos();
EXPECT_TRUE(get_att.reliable_writer_qos().disable_positive_acks.enabled);
EXPECT_EQ(get_att.reliable_writer_qos().disable_positive_acks.duration, eprosima::fastrtps::Duration_t({2, 0}));

data = default_helloworld_data_generator();

reader.startReception(data);
// Send data
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_all();
// Check that period has been correctly updated
EXPECT_FALSE(writer.waitForAllAcked(std::chrono::milliseconds(1200)));
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::milliseconds(1200)));

// Try to disable positive_acks
w_att.reliable_writer_qos().disable_positive_acks.enabled = false;

// Check that is not possible to change disable_positive_acks at runtime
EXPECT_FALSE(writer.set_qos(w_att));
}

TEST(AcknackQos, RecoverAfterLosingCommunicationWithDisablePositiveAck)
{
// This test makes the writer send a few samples
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/common/RTPSAsSocketReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,13 @@ class RTPSAsSocketReader
}
}

RTPSAsSocketReader& disable_positive_acks(
bool disable)
{
reader_attr_.disable_positive_acks = disable;
return *this;
}

private:

void receive_one(
Expand Down
Loading
Loading