Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Size & CRC in dfu-start; topic-reader stop-from-callback crashes #12789

Merged
merged 3 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/dds/rs-dds-device-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <rsutils/json.h>
#include <rsutils/string/hexarray.h>
#include <rsutils/string/from.h>
#include <rsutils/number/crc32.h>

using rsutils::json;

Expand Down Expand Up @@ -560,10 +561,14 @@ bool dds_device_proxy::check_fw_compatibility( const std::vector< uint8_t > & im
try
{
// Start DFU
auto const crc = rsutils::number::calc_crc32( image.data(), image.size() );
json dfu_start{
{ realdds::topics::control::key::id, realdds::topics::control::dfu_start::id },
{ realdds::topics::control::dfu_start::key::size, image.size() },
{ realdds::topics::control::dfu_start::key::crc, crc },
};
json reply;
_dds_dev->send_control(
json::object( { { realdds::topics::control::key::id, realdds::topics::control::dfu_start::id } } ),
&reply );
_dds_dev->send_control( dfu_start, &reply );

// Set up a reply handler that will get the "dfu-ready" message
std::mutex mutex;
Expand Down
2 changes: 2 additions & 0 deletions third-party/realdds/include/realdds/dds-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class dds_stream : public dds_stream_base
dds_stream( std::string const & stream_name, std::string const & sensor_name );

public:
~dds_stream();

bool is_open() const override { return !! _reader; }
virtual void open( std::string const & topic_name, std::shared_ptr< dds_subscriber > const & ) = 0;
virtual void close();
Expand Down
15 changes: 11 additions & 4 deletions third-party/realdds/include/realdds/dds-topic-reader-thread.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2023 Intel Corporation. All Rights Reserved.

// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved.
#pragma once

#include "dds-topic-reader.h"

#include <fastdds/dds/core/condition/GuardCondition.hpp>
#include <thread>


namespace eprosima {
namespace fastdds {
namespace dds {
class GuardCondition;
} // namespace dds
} // namespace fastdds
} // namespace eprosima


namespace realdds {


Expand All @@ -31,7 +38,7 @@ class dds_topic_reader_thread : public dds_topic_reader
{
typedef dds_topic_reader super;

eprosima::fastdds::dds::GuardCondition _stopped;
std::shared_ptr< eprosima::fastdds::dds::GuardCondition > _stopped;
std::thread _th;

public:
Expand Down
8 changes: 5 additions & 3 deletions third-party/realdds/include/realdds/dds-topic-reader.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2022 Intel Corporation. All Rights Reserved.
// Copyright(c) 2022-4 Intel Corporation. All Rights Reserved.
#pragma once

#include <fastdds/dds/subscriber/DataReaderListener.hpp>
Expand Down Expand Up @@ -31,11 +31,13 @@ class dds_subscriber;
// You may choose to create one via a 'subscriber' that manages the activities of several readers.
// on_data_available callback will be called when a sample is received.
//
class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener
class dds_topic_reader
: public eprosima::fastdds::dds::DataReaderListener
, public std::enable_shared_from_this< dds_topic_reader >
{
protected:
std::shared_ptr< dds_topic > const _topic;
std::shared_ptr < dds_subscriber > const _subscriber;
std::shared_ptr< dds_subscriber > const _subscriber;

eprosima::fastdds::dds::DataReader * _reader = nullptr;

Expand Down
4 changes: 4 additions & 0 deletions third-party/realdds/include/realdds/topics/dds-topic-names.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ namespace control {
}
namespace dfu_start {
extern std::string const id;
namespace key {
extern std::string const crc;
extern std::string const size;
}
}
namespace dfu_apply {
using notification::dfu_apply::id;
Expand Down
11 changes: 11 additions & 0 deletions third-party/realdds/src/dds-device-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ dds_device::impl::impl( std::shared_ptr< dds_participant > const & participant,
}


dds_device::impl::~impl()
{
if( _notifications_reader )
_notifications_reader->stop();
if( _metadata_reader )
_metadata_reader->stop();
}


void dds_device::impl::reset()
{
// _info should already be up-to-date
Expand All @@ -107,6 +116,8 @@ void dds_device::impl::reset()
_streams.clear();
_options.clear();
_extrinsics_map.clear();
if( _metadata_reader )
_metadata_reader->stop();
_metadata_reader.reset();
}

Expand Down
1 change: 1 addition & 0 deletions third-party/realdds/src/dds-device-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class dds_device::impl

impl( std::shared_ptr< dds_participant > const & participant,
topics::device_info const & info );
~impl();

void reset();

Expand Down
9 changes: 9 additions & 0 deletions third-party/realdds/src/dds-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ dds_stream::dds_stream( std::string const & stream_name, std::string const & sen
}


dds_stream::~dds_stream()
{
if( _reader )
_reader->stop();
}


void dds_video_stream::open( std::string const & topic_name, std::shared_ptr< dds_subscriber > const & subscriber )
{
if( is_open() )
Expand Down Expand Up @@ -63,6 +70,8 @@ void dds_motion_stream::open( std::string const & topic_name, std::shared_ptr< d

void dds_stream::close()
{
if( _reader )
_reader->stop();
_reader.reset();
}

Expand Down
48 changes: 34 additions & 14 deletions third-party/realdds/src/dds-topic-reader-thread.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2023 Intel Corporation. All Rights Reserved.
// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved.

#include <realdds/dds-topic-reader-thread.h>
#include <realdds/dds-topic.h>
Expand All @@ -10,6 +10,7 @@
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/topic/Topic.hpp>

#include <fastdds/dds/core/condition/GuardCondition.hpp>
#include <fastdds/dds/core/condition/WaitSet.hpp>

namespace realdds {
Expand All @@ -24,6 +25,7 @@ dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > c
dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic,
std::shared_ptr< dds_subscriber > const & subscriber )
: super( topic, subscriber )
, _stopped( nullptr )
{
}

Expand All @@ -40,25 +42,31 @@ void dds_topic_reader_thread::run( qos const & rqos )
DDS_THROW( runtime_error, "on-data-available must be provided" );

_reader = DDS_API_CALL( _subscriber->get()->create_datareader( _topic->get(), rqos ) );
_stopped = std::make_shared< eprosima::fastdds::dds::GuardCondition >();

_th = std::thread(
[this, name = _topic->get()->get_name()]()
[this,
OhadMeir marked this conversation as resolved.
Show resolved Hide resolved
weak = std::weak_ptr< dds_topic_reader >( shared_from_this() ), // detect lifetime
name = _topic->get()->get_name(),
stopped = _stopped] // hold a copy so the wait-set is valid even if the reader is destroyed
{
eprosima::fastdds::dds::WaitSet wait_set;
auto & condition = _reader->get_statuscondition();
condition.set_enabled_statuses( eprosima::fastdds::dds::StatusMask::data_available()
<< eprosima::fastdds::dds::StatusMask::subscription_matched()
<< eprosima::fastdds::dds::StatusMask::sample_lost() );
wait_set.attach_condition( condition );

wait_set.attach_condition( _stopped );
if( auto strong = weak.lock() )
{
auto & condition = _reader->get_statuscondition();
condition.set_enabled_statuses( eprosima::fastdds::dds::StatusMask::data_available()
<< eprosima::fastdds::dds::StatusMask::subscription_matched()
<< eprosima::fastdds::dds::StatusMask::sample_lost() );
wait_set.attach_condition( condition );

while( ! _stopped.get_trigger_value() )
wait_set.attach_condition( *stopped );
}
// We'll keep locking the object so it cannot destruct mid-callback, and exit out if we detect destruction
while( auto strong = weak.lock() )
{
eprosima::fastdds::dds::ConditionSeq active_conditions;
wait_set.wait( active_conditions, eprosima::fastrtps::c_TimeInfinite );

if( _stopped.get_trigger_value() )
if( stopped->get_trigger_value() )
break;

auto & changed = _reader->get_status_changes();
Expand All @@ -67,16 +75,22 @@ void dds_topic_reader_thread::run( qos const & rqos )
eprosima::fastdds::dds::SampleLostStatus status;
_reader->get_sample_lost_status( status );
on_sample_lost( _reader, status );
if( stopped->get_trigger_value() )
OhadMeir marked this conversation as resolved.
Show resolved Hide resolved
break;
}
if( changed.is_active( eprosima::fastdds::dds::StatusMask::data_available() ) )
{
on_data_available( _reader );
if( stopped->get_trigger_value() )
break;
}
if( changed.is_active( eprosima::fastdds::dds::StatusMask::subscription_matched() ) )
{
eprosima::fastdds::dds::SubscriptionMatchedStatus status;
_reader->get_subscription_matched_status( status );
on_subscription_matched( _reader, status );
if( stopped->get_trigger_value() )
break;
}
}
} );
Expand All @@ -87,8 +101,14 @@ void dds_topic_reader_thread::stop()
{
if( _th.joinable() )
{
_stopped.set_trigger_value( true );
_th.join();
_stopped->set_trigger_value( true );
// If we try to stop from within the thread (e.g., inside on_data_available), join() will terminate!
// If we detect such a case, then it's also possible our object will get destroyed (we may get here from the
// dtor) so we detach the thread instead.
if( _th.get_id() != std::this_thread::get_id() )
_th.join();
else
_th.detach();
}
super::stop();
}
Expand Down
4 changes: 4 additions & 0 deletions third-party/realdds/src/topics/dds-topic-names.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ namespace control {
}
namespace dfu_start {
std::string const id( "dfu-start", 9 );
namespace key {
std::string const crc( "crc", 3 );
std::string const size( "size", 4 );
}
}
namespace dfu_apply {
//using notification::dfu_apply::id;
Expand Down
Loading
Loading