Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DDS device signals, bulk query-option & misc #12517

Merged
merged 17 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ namespace librealsense {

/*static*/ std::shared_ptr< context > context::make( char const * json_settings )
{
return make( json_settings ? json::parse( json_settings ) : json::object() );
return make( ( ! json_settings || ! *json_settings ) ? json::object() : json::parse( json_settings ) );
}


Expand Down
4 changes: 2 additions & 2 deletions src/dds/rs-dds-depth-sensor-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void dds_depth_sensor_proxy::add_no_metadata( frame * const f, streaming_impl &
}


void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming )
void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json const & dds_md, streaming_impl & streaming )
{
if( auto du = rsutils::json::nested( dds_md, metadata_header_key, depth_units_key ) )
{
Expand All @@ -56,7 +56,7 @@ void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json
f->additional_data.depth_units = get_depth_scale();
}

super::add_frame_metadata( f, std::move( dds_md ), streaming );
super::add_frame_metadata( f, dds_md, streaming );
}


Expand Down
2 changes: 1 addition & 1 deletion src/dds/rs-dds-depth-sensor-proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class dds_depth_sensor_proxy

protected:
void add_no_metadata( frame *, streaming_impl & ) override;
void add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming ) override;
void add_frame_metadata( frame *, nlohmann::json const & md, streaming_impl & ) override;
};


Expand Down
20 changes: 4 additions & 16 deletions src/dds/rs-dds-device-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,13 @@ dds_device_proxy::dds_device_proxy( std::shared_ptr< const device_info > const &

if( _dds_dev->supports_metadata() )
{
_dds_dev->on_metadata_available(
[this]( nlohmann::json && dds_md )
_metadata_subscription = _dds_dev->on_metadata_available(
[this]( std::shared_ptr< const nlohmann::json > const & dds_md )
{
std::string stream_name = rsutils::json::get< std::string >( dds_md, stream_name_key );
std::string const & stream_name = rsutils::json::nested( *dds_md, stream_name_key ).string_ref();
auto it = _stream_name_to_owning_sensor.find( stream_name );
if( it != _stream_name_to_owning_sensor.end() )
it->second->handle_new_metadata( stream_name, std::move( dds_md ) );
it->second->handle_new_metadata( stream_name, dds_md );
} );
}

Expand Down Expand Up @@ -496,10 +496,6 @@ void dds_device_proxy::hardware_reset()
nlohmann::json control = nlohmann::json::object( { { "id", "hw-reset" } } );
nlohmann::json reply;
_dds_dev->send_control( control, &reply );
std::string default_status( "OK", 2 );
if( rsutils::json::get( reply, "status", default_status ) != default_status )
throw std::runtime_error( "Failed to reset: "
+ rsutils::json::get( reply, "explanation", std::string( "unknown reason" ) ) );
}


Expand All @@ -510,10 +506,6 @@ std::vector< uint8_t > dds_device_proxy::send_receive_raw_data( const std::vecto
nlohmann::json control = nlohmann::json::object( { { "id", "hwm" }, { "data", hexdata } } );
nlohmann::json reply;
_dds_dev->send_control( control, &reply );
std::string default_status( "OK", 2 );
if( rsutils::json::get( reply, "status", default_status ) != default_status )
throw std::runtime_error( "Failed HWM: "
+ rsutils::json::get( reply, "explanation", std::string( "unknown reason" ) ) );
rsutils::string::hexarray data;
if( ! rsutils::json::get_ex( reply, "data", &data ) )
throw std::runtime_error( "Failed HWM: missing 'data' in reply" );
Expand Down Expand Up @@ -541,10 +533,6 @@ std::vector< uint8_t > dds_device_proxy::build_command( uint32_t opcode,
{ "build-command", true } } );
nlohmann::json reply;
_dds_dev->send_control( control, &reply );
std::string default_status( "OK", 2 );
if( rsutils::json::get( reply, "status", default_status ) != default_status )
throw std::runtime_error( "Failed build-command: "
+ rsutils::json::get( reply, "explanation", std::string( "unknown reason" ) ) );
if( ! rsutils::json::get_ex( reply, "data", &hexdata ) )
throw std::runtime_error( "Failed HWM: missing 'data' in reply" );
return hexdata.detach();
Expand Down
2 changes: 2 additions & 0 deletions src/dds/rs-dds-device-proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class dds_device_proxy
std::map< std::string, std::shared_ptr< librealsense::stream > > _stream_name_to_librs_stream;
std::map< std::string, std::shared_ptr< dds_sensor_proxy > > _stream_name_to_owning_sensor;

rsutils::subscription _metadata_subscription;

int get_index_from_stream_name( const std::string & name ) const;
void set_profile_intrinsics( std::shared_ptr< stream_profile_interface > & profile,
const std::shared_ptr< realdds::dds_stream > & stream ) const;
Expand Down
22 changes: 14 additions & 8 deletions src/dds/rs-dds-sensor-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,16 +337,20 @@ void dds_sensor_proxy::handle_motion_data( realdds::topics::imu_msg && imu,
}


void dds_sensor_proxy::handle_new_metadata( std::string const & stream_name, nlohmann::json && dds_md )
void dds_sensor_proxy::handle_new_metadata( std::string const & stream_name,
std::shared_ptr< const nlohmann::json > const & dds_md )
{
if( ! _md_enabled )
return;

auto it = _streaming_by_name.find( stream_name );
if( it != _streaming_by_name.end() )
it->second.syncer.enqueue_metadata(
rsutils::json::get< realdds::dds_nsec >( dds_md[metadata_header_key], timestamp_key ),
std::move( dds_md ) );
{
if( auto timestamp = rsutils::json::nested( *dds_md, metadata_header_key, timestamp_key ) )
it->second.syncer.enqueue_metadata( timestamp.value< realdds::dds_nsec >(), dds_md );
else
throw std::runtime_error( "missing metadata header/timestamp" );
}
// else we're not streaming -- must be another client that's subscribed
}

Expand All @@ -361,7 +365,9 @@ void dds_sensor_proxy::add_no_metadata( frame * const f, streaming_impl & stream
}


void dds_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming )
void dds_sensor_proxy::add_frame_metadata( frame * const f,
nlohmann::json const & dds_md,
streaming_impl & streaming )
{
nlohmann::json const & md_header = rsutils::json::nested( dds_md, metadata_header_key );
nlohmann::json const & md = rsutils::json::nested( dds_md, metadata_key );
Expand Down Expand Up @@ -434,14 +440,14 @@ void dds_sensor_proxy::start( rs2_frame_callback_sptr callback )
auto & streaming = _streaming_by_name[dds_stream->name()];
streaming.syncer.on_frame_release( frame_releaser );
streaming.syncer.on_frame_ready(
[this, &streaming]( syncer_type::frame_holder && fh, nlohmann::json && md )
[this, &streaming]( syncer_type::frame_holder && fh, std::shared_ptr< const nlohmann::json > const & md )
{
if( _is_streaming ) // stop was not called
{
if( md.empty() )
if( ! md )
add_no_metadata( static_cast< frame * >( fh.get() ), streaming );
else
add_frame_metadata( static_cast< frame * >( fh.get() ), std::move( md ), streaming );
add_frame_metadata( static_cast< frame * >( fh.get() ), *md, streaming );
invoke_new_frame( static_cast< frame * >( fh.release() ), nullptr, nullptr );
}
} );
Expand Down
5 changes: 3 additions & 2 deletions src/dds/rs-dds-sensor-proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ class dds_sensor_proxy : public software_sensor
void handle_motion_data( realdds::topics::imu_msg &&,
const std::shared_ptr< stream_profile_interface > &,
streaming_impl & );
void handle_new_metadata( std::string const & stream_name, nlohmann::json && metadata );
void handle_new_metadata( std::string const & stream_name,
std::shared_ptr< const nlohmann::json > const & metadata );

virtual void add_no_metadata( frame *, streaming_impl & );
virtual void add_frame_metadata( frame * const, nlohmann::json && metadata, streaming_impl & );
virtual void add_frame_metadata( frame *, nlohmann::json const & metadata, streaming_impl & );

friend class dds_device_proxy; // Currently calls handle_new_metadata
};
Expand Down
4 changes: 3 additions & 1 deletion src/ds/d500/d500-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace librealsense
namespace ds
{
const uint16_t D555E_PID = 0x0B56;
const uint16_t D555E_RECOVERY_PID = 0x0ADE;

namespace xu_id
{
Expand All @@ -37,7 +38,8 @@ namespace librealsense
};

static const std::map< std::uint16_t, std::string > rs500_sku_names = {
{ ds::D555E_PID, "Intel RealSense D555e" }
{ ds::D555E_PID, "Intel RealSense D555e" },
{ ds::D555E_RECOVERY_PID, "Intel RealSense D555e Recovery" }
};

bool d500_try_fetch_usb_device(std::vector<platform::usb_device_info>& devices,
Expand Down
4 changes: 4 additions & 0 deletions src/fw-update/fw-update-factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace librealsense
return RS2_PRODUCT_LINE_D400;
if( ds::RS_D400_USB2_RECOVERY_PID == usb_info.pid )
return RS2_PRODUCT_LINE_D400;
if( ds::D555E_RECOVERY_PID == usb_info.pid )
return RS2_PRODUCT_LINE_D500;
return 0;
}

Expand Down Expand Up @@ -57,6 +59,8 @@ namespace librealsense
case ds::RS_D400_RECOVERY_PID:
case ds::RS_D400_USB2_RECOVERY_PID:
return std::make_shared< ds_d400_update_device >( shared_from_this(), usb );
case ds::D555E_RECOVERY_PID:
return std::make_shared< ds_d500_update_device >( shared_from_this(), usb );
default:
// Do nothing
break;
Expand Down
3 changes: 3 additions & 0 deletions third-party/realdds/doc/control.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Returns or sets the current `value` of an option within a device.

- `id` is `query-option` or `set-option`
- `option-name` is the unique option name within its owner
- Usually this is a single string value, but can also be an array of names for bulk queries: `["option1", "option2", ...]`
- `stream-name` is the unique name of the stream it's in within the device
- for a device option, this may be omitted
- for `set-option`, `value` is the new value (float)
Expand All @@ -75,6 +76,8 @@ Returns or sets the current `value` of an option within a device.
The reply should include the original control, plus:

- `value` will include the current or new value
- In the case of bulk queries, this will be an array of values, in the same size and ordering as the control's `option-name` array: `[<value-of-option1>, <value-of-option2>, ...]`
- If the `option-name` array is empty, all option values will be returned in a `option-values` key, instead (see below)

```JSON
{
Expand Down
16 changes: 11 additions & 5 deletions third-party/realdds/include/realdds/dds-device.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "dds-stream-profile.h"
#include "dds-stream.h"

#include <rsutils/subscription.h>
#include <nlohmann/json_fwd.hpp>
#include <memory>
#include <vector>
Expand Down Expand Up @@ -46,6 +47,11 @@ class dds_device
// Wait until ready. Will throw if not ready within the timeout!
void wait_until_ready( size_t timeout_ns = 5000 );

// Utility function for checking replies:
// If 'p_explanation' is nullptr, will throw if the reply status is not 'ok'.
// Otherise will return a false if not 'ok', and the explanation will be filled out.
static bool check_reply( nlohmann::json const & reply, std::string * p_explanation = nullptr );

//----------- below this line, a device must be running!

size_t number_of_streams() const;
Expand All @@ -65,16 +71,16 @@ class dds_device

bool supports_metadata() const;

typedef std::function< void( nlohmann::json && md ) > on_metadata_available_callback;
void on_metadata_available( on_metadata_available_callback cb );
typedef std::function< void( std::shared_ptr< const nlohmann::json > const & md ) > on_metadata_available_callback;
rsutils::subscription on_metadata_available( on_metadata_available_callback && );

typedef std::function< void(
dds_time const & timestamp, char type, std::string const & text, nlohmann::json const & data ) >
on_device_log_callback;
void on_device_log( on_device_log_callback cb );
rsutils::subscription on_device_log( on_device_log_callback && cb );

typedef std::function< bool( std::string const & id, nlohmann::json const & ) > on_notification_callback;
void on_notification( on_notification_callback );
typedef std::function< void( std::string const & id, nlohmann::json const & ) > on_notification_callback;
rsutils::subscription on_notification( on_notification_callback && );

private:
class impl;
Expand Down
14 changes: 7 additions & 7 deletions third-party/realdds/include/realdds/dds-metadata-syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ class dds_metadata_syncer
typedef std::unique_ptr< frame_type, on_frame_release_callback > frame_holder;

// Metadata is intended to be JSON
typedef nlohmann::json metadata_type;
typedef std::shared_ptr< const nlohmann::json > metadata_type;

// So our main callback gets this generic frame and metadata:
typedef std::function< void( frame_holder &&, metadata_type && metadata ) > on_frame_ready_callback;
typedef std::function< void( frame_holder &&, metadata_type const & metadata ) > on_frame_ready_callback;

// And we provide other callbacks, for control, testing, etc.
typedef std::function< void( key_type, metadata_type && ) > on_metadata_dropped_callback;
typedef std::function< void( key_type, metadata_type const & ) > on_metadata_dropped_callback;

private:
using key_frame = std::pair< key_type, frame_holder >;
Expand All @@ -70,9 +70,9 @@ class dds_metadata_syncer
std::deque< key_metadata > _metadata_queue;
std::mutex _queues_lock;

on_frame_release_callback _on_frame_release = nullptr;
on_frame_ready_callback _on_frame_ready = nullptr;
on_metadata_dropped_callback _on_metadata_dropped = nullptr;
on_frame_release_callback _on_frame_release;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove the initialization?
Even if it is the same, explicit is better then implicit :-)

Copy link
Collaborator Author

@maloel maloel Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree unless it's obvious. Anyway if you decide to do something different in the ctor then this will be in conflict. I'd rather people went to the ctor to check (unless there's none).

on_frame_ready_callback _on_frame_ready;
on_metadata_dropped_callback _on_metadata_dropped;

std::shared_ptr< bool > _is_alive; // Ensures object can be accessed

Expand All @@ -81,7 +81,7 @@ class dds_metadata_syncer
virtual ~dds_metadata_syncer();

void enqueue_frame( key_type, frame_holder && );
void enqueue_metadata( key_type, metadata_type && );
void enqueue_metadata( key_type, metadata_type const & );

void on_frame_release( on_frame_release_callback cb ) { _on_frame_release = cb; }
void on_frame_ready( on_frame_ready_callback cb ) { _on_frame_ready = cb; }
Expand Down
6 changes: 5 additions & 1 deletion third-party/realdds/include/realdds/dds-topic-writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <nlohmann/json_fwd.hpp>
#include <memory>
#include <atomic>


namespace eprosima {
Expand Down Expand Up @@ -39,7 +40,7 @@ class dds_topic_writer : protected eprosima::fastdds::dds::DataWriterListener

eprosima::fastdds::dds::DataWriter * _writer = nullptr;

int _n_readers = 0;
std::atomic< int > _n_readers;

public:
dds_topic_writer( std::shared_ptr< dds_topic > const & topic );
Expand Down Expand Up @@ -80,6 +81,9 @@ class dds_topic_writer : protected eprosima::fastdds::dds::DataWriterListener
// The callbacks should be set before we actually create the underlying DDS objects, so the writer does not
void run( qos const & = qos() );

// Waits until readers are detected; return false on timeout
bool wait_for_readers( dds_time timeout );

// Waits until all changes were acknowledged; return false on timeout
bool wait_for_acks( dds_time timeout );

Expand Down
Loading
Loading