Skip to content

Commit

Permalink
Updatable disable_positive_acks period (#3895)
Browse files Browse the repository at this point in the history
* Updatable disable_positive_acks period (#3879)

* Refs #19576: Test Update positive_acks period on RTPS Layer

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #19567: Fix Update positive_acks period on RTPS Layer

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #19576: Test Update positive_acks period on DDS Layer

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #19576: Fix ack_timer and Updatability of positive_acks on DDS Layer

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #19576: Uncrustify fix

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #19576: Fix linux ci

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Ref #19576: DataReaderImpl update and updateAttributes call

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #19576: Fix mac-ci

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #19576: move if clause

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

---------

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>
(cherry picked from commit b84825a)

* Fix updatability of immutable DataWriterQos (#3915)

* Refs #19687. Add regression test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19687. Rename argument.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19687. Refactor on DataWriterImpl::set_qos.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

---------

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Fix build after rebase

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

---------

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Co-authored-by: Carlos Ferreira <carloos.499@gmail.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
3 people committed Oct 26, 2023
1 parent 4e68083 commit bd34703
Show file tree
Hide file tree
Showing 13 changed files with 498 additions and 81 deletions.
1 change: 1 addition & 0 deletions include/fastdds/dds/core/policy/QosPolicies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2261,6 +2261,7 @@ class DisablePositiveACKsQosPolicy : public Parameter_t, public QosPolicy
const DisablePositiveACKsQosPolicy& b) const
{
return enabled == b.enabled &&
duration == b.duration &&
Parameter_t::operator ==(b) &&
QosPolicy::operator ==(b);
}
Expand Down
3 changes: 2 additions & 1 deletion include/fastdds/dds/publisher/qos/DataWriterQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class RTPSReliableWriterQos
const RTPSReliableWriterQos& b) const
{
return (this->times == b.times) &&
(this->disable_positive_acks == b.disable_positive_acks);
(this->disable_positive_acks == b.disable_positive_acks) &&
(this->disable_heartbeat_piggyback == b.disable_heartbeat_piggyback);
}

//!Writer Timing Attributes
Expand Down
7 changes: 7 additions & 0 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,13 @@ class StatefulWriter : public RTPSWriter
void updateTimes(
const WriterTimes& times);

/**
* Update the period of the disable positive ACKs policy.
* @param att WriterAttributes parameter.
*/
void updatePositiveAcks(
const WriterAttributes& att);

SequenceNumber_t next_sequence_number() const;

/**
Expand Down
176 changes: 109 additions & 67 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1123,10 +1123,22 @@ ReturnCode_t DataWriterImpl::set_qos(
{
return ReturnCode_t::RETCODE_IMMUTABLE_POLICY;
}

set_qos(qos_, qos_to_set, !enabled);

if (enabled)
{
if (qos_.reliability().kind == eprosima::fastrtps::RELIABLE_RELIABILITY_QOS &&
qos_.reliable_writer_qos() == qos_to_set.reliable_writer_qos())
{
// Update times and positive_acks attributes on RTPS Layer
WriterAttributes w_att;
w_att.times = qos_.reliable_writer_qos().times;
w_att.disable_positive_acks = qos_.reliable_writer_qos().disable_positive_acks.enabled;
w_att.keep_duration = qos_.reliable_writer_qos().disable_positive_acks.duration;
writer_->updateAttributes(w_att);
}

//Notify the participant that a Writer has changed its QOS
fastrtps::TopicAttributes topic_att = get_topic_attributes(qos_, *topic_, type_);
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
Expand Down Expand Up @@ -1602,114 +1614,137 @@ LivelinessLostStatus& DataWriterImpl::update_liveliness_lost_status(
void DataWriterImpl::set_qos(
DataWriterQos& to,
const DataWriterQos& from,
bool is_default)
bool update_immutable)
{
if (is_default && !(to.durability() == from.durability()))
{
to.durability() = from.durability();
to.durability().hasChanged = true;
}
if (is_default && !(to.durability_service() == from.durability_service()))
// Check immutable policies
if (update_immutable)
{
to.durability_service() = from.durability_service();
to.durability_service().hasChanged = true;
if (!(to.durability() == from.durability()))
{
to.durability() = from.durability();
to.durability().hasChanged = true;
}

if (!(to.durability_service() == from.durability_service()))
{
to.durability_service() = from.durability_service();
to.durability_service().hasChanged = true;
}

if (!(to.liveliness() == from.liveliness()))
{
to.liveliness() = from.liveliness();
to.liveliness().hasChanged = true;
}

if (!(to.reliability().kind == from.reliability().kind))
{
to.reliability().kind = from.reliability().kind;
to.reliability().hasChanged = true;
}

if (!(to.destination_order() == from.destination_order()))
{
to.destination_order() = from.destination_order();
to.destination_order().hasChanged = true;
}

if (!(to.history() == from.history()))
{
to.history() = from.history();
to.history().hasChanged = true;
}

if (!(to.resource_limits() == from.resource_limits()))
{
to.resource_limits() = from.resource_limits();
to.resource_limits().hasChanged = true;
}

if (!(to.ownership() == from.ownership()))
{
to.ownership() = from.ownership();
to.ownership().hasChanged = true;
}

to.publish_mode() = from.publish_mode();

if (!(to.representation() == from.representation()))
{
to.representation() = from.representation();
to.representation().hasChanged = true;
}

to.properties() = from.properties();

if (!(to.reliable_writer_qos() == from.reliable_writer_qos()))
{
RTPSReliableWriterQos& rel_to = to.reliable_writer_qos();
rel_to.disable_heartbeat_piggyback = from.reliable_writer_qos().disable_heartbeat_piggyback;
rel_to.disable_positive_acks.enabled = from.reliable_writer_qos().disable_positive_acks.enabled;
}

to.endpoint() = from.endpoint();

to.writer_resource_limits() = from.writer_resource_limits();

to.data_sharing() = from.data_sharing();

to.throughput_controller() = from.throughput_controller();
}

if (!(to.deadline() == from.deadline()))
{
to.deadline() = from.deadline();
to.deadline().hasChanged = true;
}

if (!(to.latency_budget() == from.latency_budget()))
{
to.latency_budget() = from.latency_budget();
to.latency_budget().hasChanged = true;
}
if (is_default && !(to.liveliness() == from.liveliness()))
{
to.liveliness() = from.liveliness();
to.liveliness().hasChanged = true;
}
if (is_default && !(to.reliability() == from.reliability()))

if (!(to.reliability().max_blocking_time == from.reliability().max_blocking_time))
{
to.reliability() = from.reliability();
to.reliability().max_blocking_time = from.reliability().max_blocking_time;
to.reliability().hasChanged = true;
}
if (is_default && !(to.destination_order() == from.destination_order()))
{
to.destination_order() = from.destination_order();
to.destination_order().hasChanged = true;
}
if (is_default && !(to.history() == from.history()))
{
to.history() = from.history();
to.history().hasChanged = true;
}
if (is_default && !(to.resource_limits() == from.resource_limits()))
{
to.resource_limits() = from.resource_limits();
to.resource_limits().hasChanged = true;
}

if (!(to.transport_priority() == from.transport_priority()))
{
to.transport_priority() = from.transport_priority();
to.transport_priority().hasChanged = true;
}

if (!(to.lifespan() == from.lifespan()))
{
to.lifespan() = from.lifespan();
to.lifespan().hasChanged = true;
}

if (!(to.user_data() == from.user_data()))
{
to.user_data() = from.user_data();
to.user_data().hasChanged = true;
}
if (is_default && !(to.ownership() == from.ownership()))
{
to.ownership() = from.ownership();
to.ownership().hasChanged = true;
}

if (!(to.ownership_strength() == from.ownership_strength()))
{
to.ownership_strength() = from.ownership_strength();
to.ownership_strength().hasChanged = true;
}

if (!(to.writer_data_lifecycle() == from.writer_data_lifecycle()))
{
to.writer_data_lifecycle() = from.writer_data_lifecycle();
}
if (is_default && !(to.publish_mode() == from.publish_mode()))
{
to.publish_mode() = from.publish_mode();
}
if (!(to.representation() == from.representation()))
{
to.representation() = from.representation();
to.representation().hasChanged = true;
}
if (is_default && !(to.properties() == from.properties()))
{
to.properties() = from.properties();
}
if (is_default && !(to.reliable_writer_qos() == from.reliable_writer_qos()))
{
to.reliable_writer_qos() = from.reliable_writer_qos();
}
if (is_default && !(to.endpoint() == from.endpoint()))
{
to.endpoint() = from.endpoint();
}
if (is_default && !(to.writer_resource_limits() == from.writer_resource_limits()))
{
to.writer_resource_limits() = from.writer_resource_limits();
}
if (is_default && !(to.throughput_controller() == from.throughput_controller()))

if (!(to.reliable_writer_qos() == from.reliable_writer_qos()))
{
to.throughput_controller() = from.throughput_controller();
}
if (is_default && !(to.data_sharing() == from.data_sharing()))
{
to.data_sharing() = from.data_sharing();
RTPSReliableWriterQos& rel_to = to.reliable_writer_qos();
rel_to.times = from.reliable_writer_qos().times;
rel_to.disable_positive_acks.duration = from.reliable_writer_qos().disable_positive_acks.duration;
}
}

Expand Down Expand Up @@ -1864,6 +1899,13 @@ bool DataWriterImpl::can_qos_be_updated(
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Data sharing configuration cannot be changed after the creation of a DataWriter.");
}
if (to.reliable_writer_qos().disable_positive_acks.enabled !=
from.reliable_writer_qos().disable_positive_acks.enabled)
{
updatable = false;
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Only the period of Positive ACKs can be changed after the creation of a DataWriter.");
}
return updatable;
}

Expand Down
2 changes: 1 addition & 1 deletion src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
static void set_qos(
DataWriterQos& to,
const DataWriterQos& from,
bool is_default);
bool update_immutable);

/**
* Extends the check_qos() call, including the check for
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,13 @@ bool DataReaderImpl::can_qos_be_updated(
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Unique network flows request cannot be changed after the creation of a DataReader.");
}
if (to.reliable_reader_qos().disable_positive_ACKs.enabled !=
from.reliable_reader_qos().disable_positive_ACKs.enabled)
{
updatable = false;
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Positive ACKs QoS cannot be changed after the creation of a DataReader.");
}
return updatable;
}

Expand Down
51 changes: 47 additions & 4 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace rtps {
/**
* Loops over all the readers in the vector, applying the given routine.
* The loop continues until the result of the routine is true for any reader
* or all readers have been processes.
* or all readers have been processed.
* The returned value is true if the routine returned true at any point,
* or false otherwise.
*/
Expand Down Expand Up @@ -947,6 +947,17 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
if (disable_positive_acks_ && last_sequence_number_ == SequenceNumber_t())
{
last_sequence_number_ = change->sequenceNumber;
if ( !(ack_event_->getRemainingTimeMilliSec() > 0))
{
// Restart ack_timer
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->restart_timer(max_blocking_time);
}
}

// Restore in case a exception was launched by RTPSMessageGroup.
Expand Down Expand Up @@ -1606,6 +1617,24 @@ void StatefulWriter::updateAttributes(
const WriterAttributes& att)
{
this->updateTimes(att.times);
if (this->get_disable_positive_acks())
{
this->updatePositiveAcks(att);
}
}

void StatefulWriter::updatePositiveAcks(
const WriterAttributes& att)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3))
{
// Implicit conversion to microseconds
keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()};
}
// Restart ack timer with new duration
ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3);
ack_event_->restart_timer();
}

void StatefulWriter::updateTimes(
Expand Down Expand Up @@ -2024,26 +2053,40 @@ bool StatefulWriter::ack_timer_expired()

while (interval.count() < 0)
{
bool acks_flag = false;
for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
[this](ReaderProxy* reader)
[this, &acks_flag](ReaderProxy* reader)
{
if (reader->disable_positive_acks())
{
reader->acked_changes_set(last_sequence_number_ + 1);
acks_flag = true;
}
return false;
}
);
last_sequence_number_++;
if (acks_flag)
{
check_acked_status();
}

// Get the next cache change from the history
CacheChange_t* change;

// Skip removed changes until reaching the last change
do
{
last_sequence_number_++;
} while (!mp_history->get_change(
last_sequence_number_,
getGuid(),
&change) && last_sequence_number_ < next_sequence_number());

if (!mp_history->get_change(
last_sequence_number_,
getGuid(),
&change))
{
// Stop ack_timer
return false;
}

Expand Down
Loading

0 comments on commit bd34703

Please sign in to comment.