diff --git a/src/context.cpp b/src/context.cpp index 2d3d3d7ef9..a7549fe617 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -98,7 +98,7 @@ namespace librealsense { /*static*/ std::shared_ptr< context > context::make( char const * json_settings ) { - return make( json_settings ? json::parse( json_settings ) : json::object() ); + return make( ( ! json_settings || ! *json_settings ) ? json::object() : json::parse( json_settings ) ); } diff --git a/src/dds/rs-dds-depth-sensor-proxy.cpp b/src/dds/rs-dds-depth-sensor-proxy.cpp index 68c6089a0f..3417229e15 100644 --- a/src/dds/rs-dds-depth-sensor-proxy.cpp +++ b/src/dds/rs-dds-depth-sensor-proxy.cpp @@ -38,7 +38,7 @@ void dds_depth_sensor_proxy::add_no_metadata( frame * const f, streaming_impl & } -void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming ) +void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json const & dds_md, streaming_impl & streaming ) { if( auto du = rsutils::json::nested( dds_md, metadata_header_key, depth_units_key ) ) { @@ -56,7 +56,7 @@ void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json f->additional_data.depth_units = get_depth_scale(); } - super::add_frame_metadata( f, std::move( dds_md ), streaming ); + super::add_frame_metadata( f, dds_md, streaming ); } diff --git a/src/dds/rs-dds-depth-sensor-proxy.h b/src/dds/rs-dds-depth-sensor-proxy.h index 26ace9dd1f..3b72b296d8 100644 --- a/src/dds/rs-dds-depth-sensor-proxy.h +++ b/src/dds/rs-dds-depth-sensor-proxy.h @@ -29,7 +29,7 @@ class dds_depth_sensor_proxy protected: void add_no_metadata( frame *, streaming_impl & ) override; - void add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming ) override; + void add_frame_metadata( frame *, nlohmann::json const & md, streaming_impl & ) override; }; diff --git a/src/dds/rs-dds-device-proxy.cpp b/src/dds/rs-dds-device-proxy.cpp index 6013a8260e..de3165a06c 100644 --- a/src/dds/rs-dds-device-proxy.cpp +++ b/src/dds/rs-dds-device-proxy.cpp @@ -282,13 +282,13 @@ dds_device_proxy::dds_device_proxy( std::shared_ptr< const device_info > const & if( _dds_dev->supports_metadata() ) { - _dds_dev->on_metadata_available( - [this]( nlohmann::json && dds_md ) + _metadata_subscription = _dds_dev->on_metadata_available( + [this]( std::shared_ptr< const nlohmann::json > const & dds_md ) { - std::string stream_name = rsutils::json::get< std::string >( dds_md, stream_name_key ); + std::string const & stream_name = rsutils::json::nested( *dds_md, stream_name_key ).string_ref(); auto it = _stream_name_to_owning_sensor.find( stream_name ); if( it != _stream_name_to_owning_sensor.end() ) - it->second->handle_new_metadata( stream_name, std::move( dds_md ) ); + it->second->handle_new_metadata( stream_name, dds_md ); } ); } @@ -496,10 +496,6 @@ void dds_device_proxy::hardware_reset() nlohmann::json control = nlohmann::json::object( { { "id", "hw-reset" } } ); nlohmann::json reply; _dds_dev->send_control( control, &reply ); - std::string default_status( "OK", 2 ); - if( rsutils::json::get( reply, "status", default_status ) != default_status ) - throw std::runtime_error( "Failed to reset: " - + rsutils::json::get( reply, "explanation", std::string( "unknown reason" ) ) ); } @@ -510,10 +506,6 @@ std::vector< uint8_t > dds_device_proxy::send_receive_raw_data( const std::vecto nlohmann::json control = nlohmann::json::object( { { "id", "hwm" }, { "data", hexdata } } ); nlohmann::json reply; _dds_dev->send_control( control, &reply ); - std::string default_status( "OK", 2 ); - if( rsutils::json::get( reply, "status", default_status ) != default_status ) - throw std::runtime_error( "Failed HWM: " - + rsutils::json::get( reply, "explanation", std::string( "unknown reason" ) ) ); rsutils::string::hexarray data; if( ! rsutils::json::get_ex( reply, "data", &data ) ) throw std::runtime_error( "Failed HWM: missing 'data' in reply" ); @@ -541,10 +533,6 @@ std::vector< uint8_t > dds_device_proxy::build_command( uint32_t opcode, { "build-command", true } } ); nlohmann::json reply; _dds_dev->send_control( control, &reply ); - std::string default_status( "OK", 2 ); - if( rsutils::json::get( reply, "status", default_status ) != default_status ) - throw std::runtime_error( "Failed build-command: " - + rsutils::json::get( reply, "explanation", std::string( "unknown reason" ) ) ); if( ! rsutils::json::get_ex( reply, "data", &hexdata ) ) throw std::runtime_error( "Failed HWM: missing 'data' in reply" ); return hexdata.detach(); diff --git a/src/dds/rs-dds-device-proxy.h b/src/dds/rs-dds-device-proxy.h index 81fda6d552..aaab40fea2 100644 --- a/src/dds/rs-dds-device-proxy.h +++ b/src/dds/rs-dds-device-proxy.h @@ -44,6 +44,8 @@ class dds_device_proxy std::map< std::string, std::shared_ptr< librealsense::stream > > _stream_name_to_librs_stream; std::map< std::string, std::shared_ptr< dds_sensor_proxy > > _stream_name_to_owning_sensor; + rsutils::subscription _metadata_subscription; + int get_index_from_stream_name( const std::string & name ) const; void set_profile_intrinsics( std::shared_ptr< stream_profile_interface > & profile, const std::shared_ptr< realdds::dds_stream > & stream ) const; diff --git a/src/dds/rs-dds-sensor-proxy.cpp b/src/dds/rs-dds-sensor-proxy.cpp index 1e4534c776..54fcf6dca6 100644 --- a/src/dds/rs-dds-sensor-proxy.cpp +++ b/src/dds/rs-dds-sensor-proxy.cpp @@ -337,16 +337,20 @@ void dds_sensor_proxy::handle_motion_data( realdds::topics::imu_msg && imu, } -void dds_sensor_proxy::handle_new_metadata( std::string const & stream_name, nlohmann::json && dds_md ) +void dds_sensor_proxy::handle_new_metadata( std::string const & stream_name, + std::shared_ptr< const nlohmann::json > const & dds_md ) { if( ! _md_enabled ) return; auto it = _streaming_by_name.find( stream_name ); if( it != _streaming_by_name.end() ) - it->second.syncer.enqueue_metadata( - rsutils::json::get< realdds::dds_nsec >( dds_md[metadata_header_key], timestamp_key ), - std::move( dds_md ) ); + { + if( auto timestamp = rsutils::json::nested( *dds_md, metadata_header_key, timestamp_key ) ) + it->second.syncer.enqueue_metadata( timestamp.value< realdds::dds_nsec >(), dds_md ); + else + throw std::runtime_error( "missing metadata header/timestamp" ); + } // else we're not streaming -- must be another client that's subscribed } @@ -361,7 +365,9 @@ void dds_sensor_proxy::add_no_metadata( frame * const f, streaming_impl & stream } -void dds_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming ) +void dds_sensor_proxy::add_frame_metadata( frame * const f, + nlohmann::json const & dds_md, + streaming_impl & streaming ) { nlohmann::json const & md_header = rsutils::json::nested( dds_md, metadata_header_key ); nlohmann::json const & md = rsutils::json::nested( dds_md, metadata_key ); @@ -434,14 +440,14 @@ void dds_sensor_proxy::start( rs2_frame_callback_sptr callback ) auto & streaming = _streaming_by_name[dds_stream->name()]; streaming.syncer.on_frame_release( frame_releaser ); streaming.syncer.on_frame_ready( - [this, &streaming]( syncer_type::frame_holder && fh, nlohmann::json && md ) + [this, &streaming]( syncer_type::frame_holder && fh, std::shared_ptr< const nlohmann::json > const & md ) { if( _is_streaming ) // stop was not called { - if( md.empty() ) + if( ! md ) add_no_metadata( static_cast< frame * >( fh.get() ), streaming ); else - add_frame_metadata( static_cast< frame * >( fh.get() ), std::move( md ), streaming ); + add_frame_metadata( static_cast< frame * >( fh.get() ), *md, streaming ); invoke_new_frame( static_cast< frame * >( fh.release() ), nullptr, nullptr ); } } ); diff --git a/src/dds/rs-dds-sensor-proxy.h b/src/dds/rs-dds-sensor-proxy.h index 79d0274743..e336077669 100644 --- a/src/dds/rs-dds-sensor-proxy.h +++ b/src/dds/rs-dds-sensor-proxy.h @@ -95,10 +95,11 @@ class dds_sensor_proxy : public software_sensor void handle_motion_data( realdds::topics::imu_msg &&, const std::shared_ptr< stream_profile_interface > &, streaming_impl & ); - void handle_new_metadata( std::string const & stream_name, nlohmann::json && metadata ); + void handle_new_metadata( std::string const & stream_name, + std::shared_ptr< const nlohmann::json > const & metadata ); virtual void add_no_metadata( frame *, streaming_impl & ); - virtual void add_frame_metadata( frame * const, nlohmann::json && metadata, streaming_impl & ); + virtual void add_frame_metadata( frame *, nlohmann::json const & metadata, streaming_impl & ); friend class dds_device_proxy; // Currently calls handle_new_metadata }; diff --git a/src/ds/d500/d500-private.h b/src/ds/d500/d500-private.h index a6ccabfb60..ac3ebe496a 100644 --- a/src/ds/d500/d500-private.h +++ b/src/ds/d500/d500-private.h @@ -14,6 +14,7 @@ namespace librealsense namespace ds { const uint16_t D555E_PID = 0x0B56; + const uint16_t D555E_RECOVERY_PID = 0x0ADE; namespace xu_id { @@ -37,7 +38,8 @@ namespace librealsense }; static const std::map< std::uint16_t, std::string > rs500_sku_names = { - { ds::D555E_PID, "Intel RealSense D555e" } + { ds::D555E_PID, "Intel RealSense D555e" }, + { ds::D555E_RECOVERY_PID, "Intel RealSense D555e Recovery" } }; bool d500_try_fetch_usb_device(std::vector& devices, diff --git a/src/fw-update/fw-update-factory.cpp b/src/fw-update/fw-update-factory.cpp index 9e7477c608..bc1e380e14 100644 --- a/src/fw-update/fw-update-factory.cpp +++ b/src/fw-update/fw-update-factory.cpp @@ -23,6 +23,8 @@ namespace librealsense return RS2_PRODUCT_LINE_D400; if( ds::RS_D400_USB2_RECOVERY_PID == usb_info.pid ) return RS2_PRODUCT_LINE_D400; + if( ds::D555E_RECOVERY_PID == usb_info.pid ) + return RS2_PRODUCT_LINE_D500; return 0; } @@ -57,6 +59,8 @@ namespace librealsense case ds::RS_D400_RECOVERY_PID: case ds::RS_D400_USB2_RECOVERY_PID: return std::make_shared< ds_d400_update_device >( shared_from_this(), usb ); + case ds::D555E_RECOVERY_PID: + return std::make_shared< ds_d500_update_device >( shared_from_this(), usb ); default: // Do nothing break; diff --git a/third-party/realdds/doc/control.md b/third-party/realdds/doc/control.md index 9de15e924f..e56778efdf 100644 --- a/third-party/realdds/doc/control.md +++ b/third-party/realdds/doc/control.md @@ -61,6 +61,7 @@ Returns or sets the current `value` of an option within a device. - `id` is `query-option` or `set-option` - `option-name` is the unique option name within its owner + - Usually this is a single string value, but can also be an array of names for bulk queries: `["option1", "option2", ...]` - `stream-name` is the unique name of the stream it's in within the device - for a device option, this may be omitted - for `set-option`, `value` is the new value (float) @@ -75,6 +76,8 @@ Returns or sets the current `value` of an option within a device. The reply should include the original control, plus: - `value` will include the current or new value + - In the case of bulk queries, this will be an array of values, in the same size and ordering as the control's `option-name` array: `[, , ...]` + - If the `option-name` array is empty, all option values will be returned in a `option-values` key, instead (see below) ```JSON { diff --git a/third-party/realdds/include/realdds/dds-device.h b/third-party/realdds/include/realdds/dds-device.h index 0fd4752ceb..b11b464b9e 100644 --- a/third-party/realdds/include/realdds/dds-device.h +++ b/third-party/realdds/include/realdds/dds-device.h @@ -7,6 +7,7 @@ #include "dds-stream-profile.h" #include "dds-stream.h" +#include #include #include #include @@ -46,6 +47,11 @@ class dds_device // Wait until ready. Will throw if not ready within the timeout! void wait_until_ready( size_t timeout_ns = 5000 ); + // Utility function for checking replies: + // If 'p_explanation' is nullptr, will throw if the reply status is not 'ok'. + // Otherise will return a false if not 'ok', and the explanation will be filled out. + static bool check_reply( nlohmann::json const & reply, std::string * p_explanation = nullptr ); + //----------- below this line, a device must be running! size_t number_of_streams() const; @@ -65,16 +71,16 @@ class dds_device bool supports_metadata() const; - typedef std::function< void( nlohmann::json && md ) > on_metadata_available_callback; - void on_metadata_available( on_metadata_available_callback cb ); + typedef std::function< void( std::shared_ptr< const nlohmann::json > const & md ) > on_metadata_available_callback; + rsutils::subscription on_metadata_available( on_metadata_available_callback && ); typedef std::function< void( dds_time const & timestamp, char type, std::string const & text, nlohmann::json const & data ) > on_device_log_callback; - void on_device_log( on_device_log_callback cb ); + rsutils::subscription on_device_log( on_device_log_callback && cb ); - typedef std::function< bool( std::string const & id, nlohmann::json const & ) > on_notification_callback; - void on_notification( on_notification_callback ); + typedef std::function< void( std::string const & id, nlohmann::json const & ) > on_notification_callback; + rsutils::subscription on_notification( on_notification_callback && ); private: class impl; diff --git a/third-party/realdds/include/realdds/dds-metadata-syncer.h b/third-party/realdds/include/realdds/dds-metadata-syncer.h index 5366dce054..1142e24bfd 100644 --- a/third-party/realdds/include/realdds/dds-metadata-syncer.h +++ b/third-party/realdds/include/realdds/dds-metadata-syncer.h @@ -54,13 +54,13 @@ class dds_metadata_syncer typedef std::unique_ptr< frame_type, on_frame_release_callback > frame_holder; // Metadata is intended to be JSON - typedef nlohmann::json metadata_type; + typedef std::shared_ptr< const nlohmann::json > metadata_type; // So our main callback gets this generic frame and metadata: - typedef std::function< void( frame_holder &&, metadata_type && metadata ) > on_frame_ready_callback; + typedef std::function< void( frame_holder &&, metadata_type const & metadata ) > on_frame_ready_callback; // And we provide other callbacks, for control, testing, etc. - typedef std::function< void( key_type, metadata_type && ) > on_metadata_dropped_callback; + typedef std::function< void( key_type, metadata_type const & ) > on_metadata_dropped_callback; private: using key_frame = std::pair< key_type, frame_holder >; @@ -70,9 +70,9 @@ class dds_metadata_syncer std::deque< key_metadata > _metadata_queue; std::mutex _queues_lock; - on_frame_release_callback _on_frame_release = nullptr; - on_frame_ready_callback _on_frame_ready = nullptr; - on_metadata_dropped_callback _on_metadata_dropped = nullptr; + on_frame_release_callback _on_frame_release; + on_frame_ready_callback _on_frame_ready; + on_metadata_dropped_callback _on_metadata_dropped; std::shared_ptr< bool > _is_alive; // Ensures object can be accessed @@ -81,7 +81,7 @@ class dds_metadata_syncer virtual ~dds_metadata_syncer(); void enqueue_frame( key_type, frame_holder && ); - void enqueue_metadata( key_type, metadata_type && ); + void enqueue_metadata( key_type, metadata_type const & ); void on_frame_release( on_frame_release_callback cb ) { _on_frame_release = cb; } void on_frame_ready( on_frame_ready_callback cb ) { _on_frame_ready = cb; } diff --git a/third-party/realdds/include/realdds/dds-topic-writer.h b/third-party/realdds/include/realdds/dds-topic-writer.h index 262fe931e1..c43daa0731 100644 --- a/third-party/realdds/include/realdds/dds-topic-writer.h +++ b/third-party/realdds/include/realdds/dds-topic-writer.h @@ -9,6 +9,7 @@ #include #include +#include namespace eprosima { @@ -39,7 +40,7 @@ class dds_topic_writer : protected eprosima::fastdds::dds::DataWriterListener eprosima::fastdds::dds::DataWriter * _writer = nullptr; - int _n_readers = 0; + std::atomic< int > _n_readers; public: dds_topic_writer( std::shared_ptr< dds_topic > const & topic ); @@ -80,6 +81,9 @@ class dds_topic_writer : protected eprosima::fastdds::dds::DataWriterListener // The callbacks should be set before we actually create the underlying DDS objects, so the writer does not void run( qos const & = qos() ); + // Waits until readers are detected; return false on timeout + bool wait_for_readers( dds_time timeout ); + // Waits until all changes were acknowledged; return false on timeout bool wait_for_acks( dds_time timeout ); diff --git a/third-party/realdds/py/pyrealdds.cpp b/third-party/realdds/py/pyrealdds.cpp index 7dd61e4c33..58a1fbf1e4 100644 --- a/third-party/realdds/py/pyrealdds.cpp +++ b/third-party/realdds/py/pyrealdds.cpp @@ -358,6 +358,7 @@ PYBIND11_MODULE(NAME, m) { .def( "topic", &dds_topic_writer::topic ) .def( "run", &dds_topic_writer::run ) .def( "has_readers", &dds_topic_writer::has_readers ) + .def( "wait_for_readers", &dds_topic_writer::wait_for_readers ) .def( "wait_for_acks", &dds_topic_writer::wait_for_acks ) .def_static( "qos", []() { return writer_qos(); } ) .def_static( "qos", []( reliability r, durability d ) { return writer_qos( r, d ); } ); @@ -867,6 +868,25 @@ PYBIND11_MODULE(NAME, m) { .def( "set_gyro_intrinsics", &dds_motion_stream::set_gyro_intrinsics ) .def( "set_accel_intrinsics", &dds_motion_stream::set_accel_intrinsics ); + + using subscription = rsutils::subscription; + py::class_< subscription, + std::shared_ptr< subscription > // handled with a shared_ptr + >( m, "subscription" ) + .def( "cancel", &subscription::cancel ) + .def( "__bool__", []( subscription const & self ) { return self.is_active(); } ) + .def( "__repr__", + []( subscription const & self ) + { + std::ostringstream os; + os << "<" SNAME ".subscription"; + if( ! self.is_active() ) + os << " cancelled"; + os << ">"; + return os.str(); + } ); + + using realdds::dds_device; py::class_< dds_device, std::shared_ptr< dds_device > // handled with a shared_ptr @@ -881,22 +901,27 @@ PYBIND11_MODULE(NAME, m) { &dds_device::wait_until_ready, py::call_guard< py::gil_scoped_release >(), "timeout-ms"_a = 5000 ) - .def( FN_FWD( dds_device, - on_metadata_available, - ( dds_device &, py::object && ), - ( nlohmann::json && j ), - callback( self, json_to_py( j ) ); ) ) - .def( FN_FWD( dds_device, - on_device_log, - (dds_device &, dds_time const &, char, std::string const &, py::object && ), - (dds_time const & timestamp, char type, std::string const & text, nlohmann::json const & data), - callback( self, timestamp, type, text, json_to_py( data ) ); ) ) - .def( FN_FWD_R( dds_device, - on_notification, - false, - (dds_device &, std::string const &, py::object &&), - ( std::string const & id, nlohmann::json const & data ), - return callback( self, id, json_to_py( data ) ); ) ) + .def( "on_metadata_available", + []( dds_device & self, std::function< void( dds_device &, py::object && ) > callback ) + { + return std::make_shared< subscription >( self.on_metadata_available( + [&self, callback]( std::shared_ptr< const nlohmann::json > const & pj ) + { FN_FWD_CALL( dds_device, "on_metadata_available", callback( self, json_to_py( *pj ) ); ) } ) ); + } ) + .def( "on_device_log", + []( dds_device & self, std::function< void( dds_device &, dds_time const &, char, std::string const &, py::object && ) > callback ) + { + return std::make_shared< subscription >( self.on_device_log( + [&self, callback]( dds_time const & timestamp, char type, std::string const & text, nlohmann::json const & data ) + { FN_FWD_CALL( dds_device, "on_device_log", callback( self, timestamp, type, text, json_to_py( data ) ); ) } ) ); + } ) + .def( "on_notification", + []( dds_device & self, std::function< void( dds_device &, std::string const &, py::object && ) > callback ) + { + return std::make_shared< subscription >( self.on_notification( + [&self, callback]( std::string const & id, nlohmann::json const & data ) + { FN_FWD_CALL( dds_device, "on_notification", callback( self, id, json_to_py( data ) ); ) } ) ); + } ) .def( "n_streams", &dds_device::number_of_streams ) .def( "streams", []( dds_device const & self ) { @@ -923,6 +948,7 @@ PYBIND11_MODULE(NAME, m) { }, py::arg( "json" ), py::arg( "wait-for-reply" ) = false, py::call_guard< py::gil_scoped_release >() ) + .def_static( "check_reply", &dds_device::check_reply ) .def( "__repr__", []( dds_device const & self ) { std::ostringstream os; os << "<" SNAME ".device"; @@ -1026,16 +1052,18 @@ PYBIND11_MODULE(NAME, m) { .def( py::init<>() ) .def( FN_FWD( dds_metadata_syncer, on_frame_ready, - ( dds_metadata_syncer::frame_type, nlohmann::json && ), - ( dds_metadata_syncer::frame_holder && fh, nlohmann::json && metadata ), - callback( self.get_frame( fh ), std::move( metadata ) ); ) ) + ( dds_metadata_syncer::frame_type, nlohmann::json const & ), + ( dds_metadata_syncer::frame_holder && fh, std::shared_ptr< const nlohmann::json > const & metadata ), + callback( self.get_frame( fh ), metadata ? *metadata : nlohmann::json() ); ) ) .def( FN_FWD( dds_metadata_syncer, on_metadata_dropped, - ( dds_metadata_syncer::key_type, nlohmann::json && ), - ( dds_metadata_syncer::key_type key, nlohmann::json && metadata ), - callback( key, std::move( metadata ) ); ) ) + ( dds_metadata_syncer::key_type, nlohmann::json const & ), + ( dds_metadata_syncer::key_type key, std::shared_ptr< const nlohmann::json > const & metadata ), + callback( key, metadata ? *metadata : nlohmann::json() ); ) ) .def( "enqueue_frame", &dds_metadata_syncer::enqueue_frame ) - .def( "enqueue_metadata", &dds_metadata_syncer::enqueue_metadata ); + .def( "enqueue_metadata", + []( dds_metadata_syncer & self, dds_metadata_syncer::key_type key, nlohmann::json const & j ) + { self.enqueue_metadata( key, std::make_shared< const nlohmann::json >( j ) ); } ); metadata_syncer.attr( "max_frame_queue_size" ) = dds_metadata_syncer::max_frame_queue_size; metadata_syncer.attr( "max_md_queue_size" ) = dds_metadata_syncer::max_md_queue_size; } diff --git a/third-party/realdds/src/dds-device-impl.cpp b/third-party/realdds/src/dds-device-impl.cpp index 6a6a879eee..157681d0c5 100644 --- a/third-party/realdds/src/dds-device-impl.cpp +++ b/third-party/realdds/src/dds-device-impl.cpp @@ -32,12 +32,10 @@ static std::string const id_device_options( "device-options", 14 ); static std::string const id_stream_header( "stream-header", 13 ); static std::string const id_stream_options( "stream-options", 14 ); static std::string const value_key( "value", 5 ); +static std::string const option_values_key( "option-values", 13 ); static std::string const sample_key( "sample", 6 ); -static std::string const status_key( "status", 6 ); -static std::string const status_ok( "ok", 2 ); static std::string const option_name_key( "option-name", 11 ); static std::string const stream_name_key( "stream-name", 11 ); -static std::string const explanation_key( "explanation", 11 ); static std::string const control_key( "control", 7 ); static std::string const id_log( "log", 3 ); @@ -100,19 +98,19 @@ void dds_device::impl::set_state( state_t new_state ) nlohmann::json md_settings = rsutils::json::nested( _device_settings, "metadata" ); if( ! md_settings.is_null() && ! md_settings.is_object() ) // not found is null { - LOG_DEBUG( "... metadata is available but device/metadata is disabled" ); + LOG_DEBUG( "[" << debug_name() << "] ... metadata is available but device/metadata is disabled" ); _metadata_reader.reset(); } else { - LOG_DEBUG( "... metadata is enabled" ); + LOG_DEBUG( "[" << debug_name() << "] ... metadata is enabled" ); dds_topic_reader::qos rqos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ); rqos.history().depth = 10; // Support receive metadata from multiple streams rqos.override_from_json( md_settings ); _metadata_reader->run( rqos ); } } - LOG_DEBUG( "device '" << _info.debug_name() << "' (" << _participant->print( guid() ) << ") is ready" ); + LOG_DEBUG( "[" << debug_name() << "] device is ready" ); } _state = new_state; @@ -152,17 +150,22 @@ dds_guid const & dds_device::impl::guid() const } +std::string dds_device::impl::debug_name() const +{ + return rsutils::string::from() << _info.debug_name() << _participant->print( guid() ); +} + + void dds_device::impl::wait_until_ready( size_t timeout_ms ) { if( is_ready() ) return; - LOG_DEBUG( "waiting for '" << _info.debug_name() << "' ..." ); rsutils::time::timer timer{ std::chrono::milliseconds( timeout_ms ) }; do { if( timer.has_expired() ) - DDS_THROW( runtime_error, "timeout waiting for '" << _info.debug_name() << "'" ); + DDS_THROW( runtime_error, "timeout waiting for '" << debug_name() << "'" ); std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) ); } while( ! is_ready() ); @@ -179,16 +182,15 @@ void dds_device::impl::handle_notification( nlohmann::json const & j, auto it = _notification_handlers.find( id ); if( it != _notification_handlers.end() ) ( this->*( it->second ) )( j, sample ); - else if( ! _on_notification || ! _on_notification( id, j ) ) - throw std::runtime_error( "unhandled" ); + _on_notification.raise( id, j ); } catch( std::exception const & e ) { - LOG_DEBUG( "notification error: " << e.what() << " " << j ); + LOG_DEBUG( "[" << debug_name() << "] notification error: " << e.what() << "\n " << j ); } catch( ... ) { - LOG_DEBUG( "notification error: unknown exception " << j ); + LOG_DEBUG( "[" << debug_name() << "] notification error: unknown exception\n " << j ); } try @@ -213,17 +215,22 @@ void dds_device::impl::handle_notification( nlohmann::json const & j, replyit->second = std::move( j ); _replies_cv.notify_all(); } + else + { + // Nobody's waiting for it - but we can still log any errors: + dds_device::check_reply( j ); + } } } } } catch( std::exception const & e ) { - LOG_DEBUG( "reply error: " << e.what() << " " << j ); + LOG_DEBUG( "[" << debug_name() << "] reply error: " << e.what() << " " << j ); } catch( ... ) { - LOG_DEBUG( "reply error: unknown exception " << j ); + LOG_DEBUG( "[" << debug_name() << "] reply error: unknown exception " << j ); } } @@ -235,54 +242,88 @@ void dds_device::impl::on_option_value( nlohmann::json const & j, eprosima::fast // This is the notification for "set-option" or "query-option", meaning someone sent a control request to set/get an // option value. In either case a value will be sent; we want to update ours accordingly to reflect the latest: - if( rsutils::json::get( j, status_key, status_ok ) != status_ok ) - { - // Ignore errors - throw std::runtime_error( "status not OK" ); - } - float new_value; - if( ! rsutils::json::get_ex( j, value_key, &new_value ) ) - { - throw std::runtime_error( "missing value" ); - } + + // Ignore errors + dds_device::check_reply( j ); + // We need the original control request as part of the reply, otherwise we can't know what option this is for - auto it = j.find( control_key ); - if( it == j.end() ) - { - throw std::runtime_error( "missing control" ); - } - nlohmann::json const & control = it.value(); + rsutils::json::nested control( j, control_key ); if( ! control.is_object() ) - { - throw std::runtime_error( "control is not an object" ); - } - std::string option_name; - if( ! rsutils::json::get_ex( control, option_name_key, &option_name ) ) - { - throw std::runtime_error( "missing control/option-name" ); - } - - // Find the option and set its value + throw std::runtime_error( "missing control object" ); + + // Find the relevant (stream) options to update dds_options const * options = &_options; std::string stream_name; // default = empty = device option - if( rsutils::json::get_ex( control, stream_name_key, &stream_name ) && !stream_name.empty() ) + if( rsutils::json::get_ex( control, stream_name_key, &stream_name ) && ! stream_name.empty() ) { auto stream_it = _streams.find( stream_name ); if( stream_it == _streams.end() ) + throw std::runtime_error( "stream '" + stream_name + "' not found" ); + options = &stream_it->second->options(); + } + + // This little function is used in all use-cases below: + auto update_option = [&]( std::string const & option_name, float const new_value ) + { + // Find the option and set its value + for( auto & option : *options ) { - throw std::runtime_error( "owner not found" ); + if( option->get_name() == option_name ) + { + option->set_value( new_value ); + return; + } } - options = &stream_it->second->options(); + LOG_DEBUG( "[" << debug_name() << "] option '" << option_name << "': not found" ); + }; + + rsutils::json::nested value_j( j, value_key ); + if( ! value_j.exists() ) + { + // Use case: + // Bulk query without ANY option names supplied, { "option-name": [] } + // There is no 'value' key; instead, the server returns option-value pairs in 'option-values': + rsutils::json::nested option_values( j, option_values_key ); + if( ! option_values.is_object() ) + throw std::runtime_error( "missing value or option-values" ); + for( auto it = option_values->begin(); it != option_values->end(); ++it ) + update_option( it.key(), it.value().get< float >() ); + return; } - for( auto & option : *options ) + + rsutils::json::nested option_name_j( control, option_name_key ); + if( ! option_name_j.exists() ) + throw std::runtime_error( "missing option-name" ); + + if( value_j->is_array() ) { - if( option->get_name() == option_name ) + // Use case: + // Bulk query, but specific option names were given, { "option-name": ["opt1", ...] } + // The 'option-name' should be an array of options for which values are returned + // The 'value' should be a similarly-sized array + if( ! option_name_j->is_array() ) + throw std::runtime_error( "'option-name' does not match 'value' array type" ); + if( value_j->size() != option_name_j->size() ) + throw std::runtime_error( "'option-name' does not match 'value' array size" ); + + auto size = value_j->size(); + for( auto x = 0; x < size; ++x ) { - option->set_value( new_value ); - return; + auto const & option_name = rsutils::json::string_ref( option_name_j->at( x ) ); + auto const new_value = rsutils::json::value< float >( value_j->at( x ) ); + update_option( option_name, new_value ); } + return; } - throw std::runtime_error( "option not found" ); + + // Use case: + // Simple single-option value update, { "option-name": "opt1" } + // 'option-name' should be a string + // A single 'value' is returned + if( ! option_name_j->is_string() ) + throw std::runtime_error( "option-name is not a string" ); + auto & option_name = option_name_j.string_ref(); + update_option( option_name, rsutils::json::value< float >( value_j ) ); } @@ -313,26 +354,18 @@ void dds_device::impl::on_log( nlohmann::json const & j, eprosima::fastdds::dds: { if( ! entry.is_array() ) throw std::runtime_error( "not an array" ); - if( entry.size() > 4 ) - throw std::runtime_error( "too long" ); + if( entry.size() < 3 || entry.size() > 4 ) + throw std::runtime_error( "bad array length" ); auto timestamp = time_from( rsutils::json::get< dds_nsec >( entry, 0 ) ); - auto const stype = rsutils::json::get< std::string >( entry, 1 ); + auto const & stype = rsutils::json::string_ref( entry[1] ); if( stype.length() != 1 || ! strchr( "EWID", stype[0] ) ) throw std::runtime_error( "type not one of 'EWID'" ); char const type = stype[0]; - auto const text = rsutils::json::get< std::string >( entry, 2 ); - nlohmann::json data; - if( entry.size() > 3 ) - { - data = entry.at( 3 ); - if( ! data.is_object() ) - throw std::runtime_error( "data is not an object" ); - } + auto const & text = rsutils::json::string_ref( entry[2] ); + nlohmann::json const & data = entry.size() > 3 ? entry[3] : rsutils::json::null_json; - if( _on_device_log ) - _on_device_log( timestamp, type, text, data ); - else - LOG_DEBUG( "[" << _info.debug_name() << "][" << timestr( timestamp ) << "][" << type << "] " << text + if( ! _on_device_log.raise( timestamp, type, text, data ) ) + LOG_DEBUG( "[" << debug_name() << "][" << timestr( timestamp ) << "][" << type << "] " << text << " [" << data << "]" ); } catch( std::exception const & e ) @@ -368,10 +401,6 @@ void dds_device::impl::open( const dds_stream_profiles & profiles ) nlohmann::json reply; write_control_message( j, &reply ); - - if( rsutils::json::get( reply, status_key, status_ok ) != status_ok ) - throw std::runtime_error( "failed to open stream: " - + rsutils::json::get< std::string >( reply, explanation_key, "no explanation" ) ); } @@ -390,10 +419,6 @@ void dds_device::impl::set_option_value( const std::shared_ptr< dds_option > & o nlohmann::json reply; write_control_message( j, &reply ); - - if( rsutils::json::get( reply, status_key, status_ok ) != status_ok ) - throw std::runtime_error( - rsutils::json::get< std::string >( reply, explanation_key, "failed to set-option; no explanation" ) ); //option->set_value( new_value ); } @@ -413,9 +438,6 @@ float dds_device::impl::query_option_value( const std::shared_ptr< dds_option > nlohmann::json reply; write_control_message( j, &reply ); - if( rsutils::json::get( reply, status_key, status_ok ) != status_ok ) - throw std::runtime_error( - rsutils::json::get< std::string >( reply, explanation_key, "failed to query-option; no explanation" ) ); return rsutils::json::get< float >( reply, value_key ); } @@ -442,6 +464,9 @@ void dds_device::impl::write_control_message( topics::flexible_msg && msg, nlohm //LOG_DEBUG( "got reply: " << actual_reply ); *reply = std::move( actual_reply ); _replies.erase( this_sequence_number ); + + // Throw if there's an error + dds_device::check_reply( *reply ); } } @@ -501,15 +526,16 @@ void dds_device::impl::create_metadata_reader() topics::flexible_msg message; while( topics::flexible_msg::take_next( *_metadata_reader, &message ) ) { - if( message.is_valid() && _on_metadata_available ) + if( message.is_valid() && _on_metadata_available.size() ) { try { - _on_metadata_available( std::move( message.json_data() ) ); + auto sptr = std::make_shared< const nlohmann::json >( message.json_data() ); + _on_metadata_available.raise( sptr ); } catch( std::exception const & e ) { - LOG_DEBUG( "metadata exception: " << e.what() ); + LOG_DEBUG( "[" << debug_name() << "] metadata exception: " << e.what() ); } } } @@ -542,7 +568,7 @@ void dds_device::impl::on_device_header( nlohmann::json const & j, eprosima::fas eprosima::fastrtps::rtps::iHandle2GUID( _server_guid, sample.publication_handle ); _n_streams_expected = rsutils::json::get< size_t >( j, "n-streams" ); - LOG_DEBUG( "... " << id_device_header << ": " << _n_streams_expected << " streams expected" ); + LOG_DEBUG( "[" << debug_name() << "] ... " << id_device_header << ": " << _n_streams_expected << " streams expected" ); if( rsutils::json::has( j, "extrinsics" ) ) { @@ -550,7 +576,7 @@ void dds_device::impl::on_device_header( nlohmann::json const & j, eprosima::fas { std::string from_name = rsutils::json::get< std::string >( ex, 0 ); std::string to_name = rsutils::json::get< std::string >( ex, 1 ); - LOG_DEBUG( " ... got extrinsics from " << from_name << " to " << to_name ); + LOG_DEBUG( "[" << debug_name() << "] ... got extrinsics from " << from_name << " to " << to_name ); extrinsics extr = extrinsics::from_json( rsutils::json::get< nlohmann::json >( ex, 2 ) ); _extrinsics_map[std::make_pair( from_name, to_name )] = std::make_shared< extrinsics >( extr ); } @@ -567,7 +593,7 @@ void dds_device::impl::on_device_options( nlohmann::json const & j, eprosima::fa if( rsutils::json::has( j, "options" ) ) { - LOG_DEBUG( "... " << id_device_options << ": " << j["options"].size() << " options received" ); + LOG_DEBUG( "[" << debug_name() << "] ... " << id_device_options << ": " << j["options"].size() << " options received" ); for( auto & option_json : j["options"] ) { @@ -636,7 +662,7 @@ void dds_device::impl::on_stream_header( nlohmann::json const & j, eprosima::fas "failed to instantiate stream type '" << stream_type << "' (instead, got '" << stream->type_string() << "')" ); - LOG_DEBUG( "... stream " << _streams.size() << "/" << _n_streams_expected << " '" << stream_name + LOG_DEBUG( "[" << debug_name() << "] ... stream " << _streams.size() << "/" << _n_streams_expected << " '" << stream_name << "' received with " << profiles.size() << " profiles" << ( stream->metadata_enabled() ? " and metadata" : "" ) ); diff --git a/third-party/realdds/src/dds-device-impl.h b/third-party/realdds/src/dds-device-impl.h index 655b08acdc..7fa8b14bba 100644 --- a/third-party/realdds/src/dds-device-impl.h +++ b/third-party/realdds/src/dds-device-impl.h @@ -12,6 +12,7 @@ #include +#include #include #include @@ -74,6 +75,7 @@ class dds_device::impl topics::device_info const & info ); dds_guid const & guid() const; + std::string debug_name() const; void wait_until_ready( size_t timeout_ms ); bool is_ready() const { return state_t::READY == _state; } @@ -85,16 +87,29 @@ class dds_device::impl void set_option_value( const std::shared_ptr< dds_option > & option, float new_value ); float query_option_value( const std::shared_ptr< dds_option > & option ); - typedef std::function< void( nlohmann::json && md ) > on_metadata_available_callback; - void on_metadata_available( on_metadata_available_callback cb ) { _on_metadata_available = cb; } - - typedef std::function< void( - dds_time const & timestamp, char type, std::string const & text, nlohmann::json const & data ) > - on_device_log_callback; - void on_device_log( on_device_log_callback cb ) { _on_device_log = cb; } + using on_metadata_available_signal = rsutils::signal< std::shared_ptr< const nlohmann::json > const & >; + using on_metadata_available_callback = on_metadata_available_signal::callback; + rsutils::subscription on_metadata_available( on_metadata_available_callback && cb ) + { + return _on_metadata_available.subscribe( std::move( cb ) ); + } + + using on_device_log_signal = rsutils::signal< dds_time const &, // timestamp + char, // type + std::string const &, // text + nlohmann::json const & >; // data + using on_device_log_callback = on_device_log_signal::callback; + rsutils::subscription on_device_log( on_device_log_callback && cb ) + { + return _on_device_log.subscribe( std::move( cb ) ); + } - typedef std::function< bool( std::string const &, nlohmann::json const & ) > on_notification_callback; - void on_notification( on_notification_callback cb ) { _on_notification = cb; } + using on_notification_signal = rsutils::signal< std::string const &, nlohmann::json const & >; + using on_notification_callback = on_notification_signal::callback; + rsutils::subscription on_notification( on_notification_callback && cb ) + { + return _on_notification.subscribe( std::move( cb ) ); + } private: void create_notifications_reader(); @@ -117,9 +132,9 @@ class dds_device::impl static notification_handlers const _notification_handlers; void handle_notification( nlohmann::json const &, eprosima::fastdds::dds::SampleInfo const & ); - on_metadata_available_callback _on_metadata_available; - on_device_log_callback _on_device_log; - on_notification_callback _on_notification; + on_metadata_available_signal _on_metadata_available; + on_device_log_signal _on_device_log; + on_notification_signal _on_notification; }; diff --git a/third-party/realdds/src/dds-device-server.cpp b/third-party/realdds/src/dds-device-server.cpp index 4749e6f6b2..f3507eae94 100644 --- a/third-party/realdds/src/dds-device-server.cpp +++ b/third-party/realdds/src/dds-device-server.cpp @@ -36,6 +36,7 @@ static std::string const id_key( "id", 2 ); static std::string const id_set_option( "set-option", 10 ); static std::string const id_query_option( "query-option", 12 ); static std::string const value_key( "value", 5 ); +static std::string const option_values_key( "option-values", 13 ); static std::string const sample_key( "sample", 6 ); static std::string const status_key( "status", 6 ); static std::string const status_ok( "ok", 2 ); @@ -360,33 +361,72 @@ void dds_device_server::handle_set_option( const nlohmann::json & j, nlohmann::j void dds_device_server::handle_query_option( const nlohmann::json & j, nlohmann::json & reply ) { - auto option_name = rsutils::json::get< std::string >( j, option_name_key ); std::string stream_name; // default is empty, for a device option rsutils::json::get_ex( j, stream_name_key, &stream_name ); - std::shared_ptr< dds_option > opt = find_option( option_name, stream_name ); - if( opt ) + auto query_option = [&]( std::shared_ptr< dds_option > const & option ) { float value; if( _query_option_callback ) { - value = _query_option_callback( opt ); + value = _query_option_callback( option ); // Ensure realdds option is up to date with actual value from callback - opt->set_value( value ); + option->set_value( value ); } else { - value = opt->get_value(); + value = option->get_value(); } - reply[value_key] = value; - } - else + return value; + }; + auto query_option_j = [&]( rsutils::json::nested const & j ) { + if( ! j->is_string() ) + DDS_THROW( runtime_error, "option name should be a string; got " << j ); + std::string const & option_name = j.string_ref(); + std::shared_ptr< dds_option > option = find_option( option_name, stream_name ); + if( option ) + return query_option( option ); + if( stream_name.empty() ) stream_name = "device"; else stream_name = "'" + stream_name + "'"; DDS_THROW( runtime_error, stream_name + " option '" + option_name + "' not found" ); + }; + + rsutils::json::nested option_name( j, option_name_key ); + if( option_name.is_array() ) + { + if( option_name->empty() ) + { + // Query all options and return in option:value object + nlohmann::json & option_values = reply[option_values_key] = nlohmann::json::object(); + if( stream_name.empty() ) + { + for( auto const & option : _options ) + option_values[option->get_name()] = query_option( option ); + } + else + { + auto stream_it = _stream_name_to_server.find( stream_name ); + if( stream_it != _stream_name_to_server.end() ) + { + for( auto const & option : stream_it->second->options() ) + option_values[option->get_name()] = query_option( option ); + } + } + } + else + { + nlohmann::json & value = reply[value_key]; + for( auto x = 0; x < option_name->size(); ++x ) + value.push_back( query_option_j( option_name->at( x ) ) ); + } + } + else + { + reply[value_key] = query_option_j( option_name ); } } diff --git a/third-party/realdds/src/dds-device.cpp b/third-party/realdds/src/dds-device.cpp index 84902a4823..a67318b8a7 100644 --- a/third-party/realdds/src/dds-device.cpp +++ b/third-party/realdds/src/dds-device.cpp @@ -6,6 +6,8 @@ #include #include "dds-device-impl.h" +#include + namespace realdds { @@ -13,7 +15,7 @@ namespace realdds { dds_device::dds_device( std::shared_ptr< dds_participant > const & participant, topics::device_info const & info ) : _impl( std::make_shared< dds_device::impl >( participant, info ) ) { - LOG_DEBUG( "+device '" << info.debug_name() << "' on " << info.topic_root ); + LOG_DEBUG( "+device '" << _impl->debug_name() << "' on " << info.topic_root ); } @@ -117,19 +119,60 @@ bool dds_device::supports_metadata() const return !! _impl->_metadata_reader; } -void dds_device::on_metadata_available( on_metadata_available_callback cb ) +rsutils::subscription dds_device::on_metadata_available( on_metadata_available_callback && cb ) { - _impl->on_metadata_available( cb ); + return _impl->on_metadata_available( std::move( cb ) ); } -void dds_device::on_device_log( on_device_log_callback cb ) +rsutils::subscription dds_device::on_device_log( on_device_log_callback && cb ) { - _impl->on_device_log( cb ); + return _impl->on_device_log( std::move( cb ) ); } -void dds_device::on_notification( on_notification_callback cb ) +rsutils::subscription dds_device::on_notification( on_notification_callback && cb ) { - _impl->on_notification( cb ); + return _impl->on_notification( std::move( cb ) ); +} + + +static std::string const status_key( "status", 6 ); +static std::string const status_ok( "ok", 2 ); +static std::string const explanation_key( "explanation", 11 ); +static std::string const id_key( "id", 2 ); + + +bool dds_device::check_reply( nlohmann::json const & reply, std::string * p_explanation ) +{ + auto status_j = rsutils::json::nested( reply, status_key ); + if( ! status_j ) + return true; + std::ostringstream os; + if( ! status_j->is_string() ) + os << "bad status " << status_j; + else if( status_j.string_ref() == status_ok ) + return true; + else + { + os << "["; + if( auto id = rsutils::json::nested( reply, id_key ) ) + { + if( id->is_string() ) + os << "\"" << id.string_ref() << "\" "; + } + os << status_j.string_ref() << "]"; + if( auto explanation_j = rsutils::json::nested( reply, explanation_key ) ) + { + os << ' '; + if( ! explanation_j->is_string() || explanation_j.string_ref().empty() ) + os << "bad explanation " << explanation_j; + else + os << explanation_j.string_ref(); + } + } + if( ! p_explanation ) + DDS_THROW( runtime_error, os.str() ); + *p_explanation = os.str(); + return false; } diff --git a/third-party/realdds/src/dds-metadata-syncer.cpp b/third-party/realdds/src/dds-metadata-syncer.cpp index 22dd79abea..012ddabf49 100644 --- a/third-party/realdds/src/dds-metadata-syncer.cpp +++ b/third-party/realdds/src/dds-metadata-syncer.cpp @@ -37,9 +37,7 @@ void dds_metadata_syncer::enqueue_frame( key_type id, frame_holder && frame ) std::unique_lock< std::mutex > lock( _queues_lock ); // Expect increasing order if( ! _frame_queue.empty() && _frame_queue.back().first >= id ) - DDS_THROW( runtime_error, - "frame " + std::to_string( id ) + " cannot be enqueued after " - + std::to_string( _frame_queue.back().first ) ); + DDS_THROW( runtime_error, "frame " << id << " cannot be enqueued after " << _frame_queue.back().first ); // We must push the new one before releasing the lock, else someone else may push theirs ahead of ours _frame_queue.push_back( key_frame{ id, std::move( frame ) } ); @@ -52,21 +50,19 @@ void dds_metadata_syncer::enqueue_frame( key_type id, frame_holder && frame ) } -void dds_metadata_syncer::enqueue_metadata( key_type id, metadata_type && md ) +void dds_metadata_syncer::enqueue_metadata( key_type id, metadata_type const & md ) { std::weak_ptr< bool > alive = _is_alive; - if( !alive.lock() ) // Check if was destructed by another thread + if( ! alive.lock() ) // Check if was destructed by another thread return; std::unique_lock< std::mutex > lock( _queues_lock ); // Expect increasing order if( ! _metadata_queue.empty() && _metadata_queue.back().first >= id ) - DDS_THROW( runtime_error, - "metadata " + std::to_string( id ) + " cannot be enqueued after " - + std::to_string( _metadata_queue.back().first ) ); + DDS_THROW( runtime_error, "metadata " << id << " cannot be enqueued after " << _metadata_queue.back().first ); // We must push the new one before releasing the lock, else someone else may push theirs ahead of ours - _metadata_queue.push_back( key_metadata{ id, std::move( md ) } ); + _metadata_queue.push_back( key_metadata{ id, md } ); while( _metadata_queue.size() > max_md_queue_size ) if( ! drop_metadata( lock ) ) // Lock released and aquired around callbacks, check we are alive @@ -79,7 +75,7 @@ void dds_metadata_syncer::enqueue_metadata( key_type id, metadata_type && md ) void dds_metadata_syncer::search_for_match( std::unique_lock< std::mutex > & lock ) { // Wait for frame + metadata set - while( !_frame_queue.empty() && !_metadata_queue.empty() ) + while( ! _frame_queue.empty() && ! _metadata_queue.empty() ) { // We're looking for metadata with the same ID as the next frame auto const frame_key = _frame_queue.front().first; @@ -88,18 +84,18 @@ void dds_metadata_syncer::search_for_match( std::unique_lock< std::mutex > & loc if( frame_key < md_key ) { // Newer metadata: we can release the frame - if( !handle_frame_without_metadata( lock ) ) + if( ! handle_frame_without_metadata( lock ) ) return; } else if( frame_key == md_key ) { - if( !handle_match( lock ) ) + if( ! handle_match( lock ) ) return; } else { // Throw away any old metadata (with ID < the frame) since the frame ID will keep increasing - if( !drop_metadata( lock ) ) + if( ! drop_metadata( lock ) ) return; } } @@ -118,8 +114,8 @@ bool dds_metadata_syncer::handle_match( std::unique_lock< std::mutex > & lock ) if( _on_frame_ready ) { lock.unlock(); - _on_frame_ready( std::move( fh ), std::move( md ) ); - if( !alive.lock() ) // Check if was destructed by another thread during callback + _on_frame_ready( std::move( fh ), md ); + if( ! alive.lock() ) // Check if was destructed by another thread during callback return false; lock.lock(); } @@ -138,9 +134,8 @@ bool dds_metadata_syncer::handle_frame_without_metadata( std::unique_lock< std:: if( _on_frame_ready ) { lock.unlock(); - metadata_type md; - _on_frame_ready( std::move( fh ), std::move( md ) ); - if( !alive.lock() ) // Check if was destructed by another thread during callback + _on_frame_ready( std::move( fh ), metadata_type() ); + if( ! alive.lock() ) // Check if was destructed by another thread during callback return false; lock.lock(); } @@ -159,8 +154,8 @@ bool dds_metadata_syncer::drop_metadata( std::unique_lock< std::mutex > & lock ) if( _on_metadata_dropped ) { lock.unlock(); - _on_metadata_dropped( key, std::move( md ) ); - if( !alive.lock() ) // Check if was destructed by another thread during callback + _on_metadata_dropped( key, md ); + if( ! alive.lock() ) // Check if was destructed by another thread during callback return false; lock.lock(); } diff --git a/third-party/realdds/src/dds-participant.cpp b/third-party/realdds/src/dds-participant.cpp index f286228787..55819b4f6c 100644 --- a/third-party/realdds/src/dds-participant.cpp +++ b/third-party/realdds/src/dds-participant.cpp @@ -228,10 +228,10 @@ void dds_participant::init( dds_domain_id domain_id, std::string const & partici // Above are defaults override_participant_qos_from_json( pqos, settings ); - // Listener will call DataReaderListener::on_data_available for a specific reader, - // not SubscriberListener::on_data_on_readers for any reader - // ( See note on https://fast-dds.docs.eprosima.com/en/v2.7.0/fastdds/dds_layer/core/entity/entity.html ) - StatusMask par_mask = StatusMask::all() >> StatusMask::data_on_readers(); + // NOTE: the listener callbacks we use are all specific to FastDDS and so are always enabled: + // https://fast-dds.docs.eprosima.com/en/latest/fastdds/dds_layer/core/entity/entity.html#listener + // We need none of the standard callbacks at this level: these can be enabled on a per-reader/-writer basis! + StatusMask const par_mask = StatusMask::none(); _participant_factory = DomainParticipantFactory::get_shared_instance(); _participant = DDS_API_CALL( _participant_factory->create_participant( domain_id, pqos, _domain_listener.get(), par_mask ) ); diff --git a/third-party/realdds/src/dds-topic-writer.cpp b/third-party/realdds/src/dds-topic-writer.cpp index 62722412bb..f18d0445c7 100644 --- a/third-party/realdds/src/dds-topic-writer.cpp +++ b/third-party/realdds/src/dds-topic-writer.cpp @@ -14,6 +14,7 @@ #include #include +#include #include @@ -30,6 +31,7 @@ dds_topic_writer::dds_topic_writer( std::shared_ptr< dds_topic > const & topic, std::shared_ptr< dds_publisher > const & publisher ) : _topic( topic ) , _publisher( publisher ) + , _n_readers( 0 ) { } @@ -109,6 +111,21 @@ void dds_topic_writer::run( qos const & wqos ) } +bool dds_topic_writer::wait_for_readers( dds_time timeout ) +{ + // Better to use on_publication_matched, but that would require additional data members etc. + // For now, keep it simple: + rsutils::time::timer timer( std::chrono::nanoseconds( timeout.to_ns() ) ); + while( _n_readers.load() < 1 ) + { + if( timer.has_expired() ) + return false; + std::this_thread::sleep_for( std::chrono::milliseconds( 250 ) ); + } + return true; +} + + bool dds_topic_writer::wait_for_acks( dds_time timeout ) { return !! _writer->wait_for_acknowledgments( timeout ); diff --git a/third-party/rsutils/include/rsutils/json.h b/third-party/rsutils/include/rsutils/json.h index d208332ade..c33c0b1e43 100644 --- a/third-party/rsutils/include/rsutils/json.h +++ b/third-party/rsutils/include/rsutils/json.h @@ -163,6 +163,10 @@ class nested nlohmann::json const & get() const { return _pj ? *_pj : null_json; } operator nlohmann::json const & () const { return get(); } + bool is_array() const { return exists() && _pj->is_array(); } + bool is_object() const { return exists() && _pj->is_object(); } + bool is_string() const { return exists() && _pj->is_string(); } + // Get the JSON as a value template< class T > T value() { return json::value< T >( get() ); } // Get the JSON as a value, or a default if not there diff --git a/third-party/rsutils/include/rsutils/signal.h b/third-party/rsutils/include/rsutils/signal.h index c798ef4ac9..7d2bfb717a 100644 --- a/third-party/rsutils/include/rsutils/signal.h +++ b/third-party/rsutils/include/rsutils/signal.h @@ -88,22 +88,27 @@ class signal } ); } - void raise( Args... args ) + bool raise( Args... args ) { + std::unique_lock< std::mutex > locker( _impl->mutex ); + auto const N = _impl->subscribers.size(); + if( ! N ) + return false; // no notifications generated + std::vector< callback > functions; + functions.reserve( N ); + for( auto const & s : _impl->subscribers ) + functions.push_back( s.second ); - { - std::lock_guard< std::mutex > locker( _impl->mutex ); - functions.reserve( _impl->subscribers.size() ); - for( auto const & s : _impl->subscribers ) - functions.push_back( s.second ); - } + locker.unlock(); // NOTE: when calling our subscribers, we do not perfectly forward on purpose to avoid the situation where the // first subscriber will move an argument and the second will then get nothing! // for( auto const & func : functions ) func( /*std::forward< Args >(*/ args /*)*/... ); + + return true; // notifications went out } // How many subscriptions are active diff --git a/unit-tests/dds/options-server.py b/unit-tests/dds/options-server.py deleted file mode 100644 index 713a38bf82..0000000000 --- a/unit-tests/dds/options-server.py +++ /dev/null @@ -1,90 +0,0 @@ -# License: Apache 2.0. See LICENSE file in root directory. -# Copyright(c) 2022 Intel Corporation. All Rights Reserved. - -import pyrealdds as dds -from rspy import log, test - -dds.debug( log.is_debug_on(), log.nested ) - - -participant = dds.participant() -participant.init( 123, "options-server" ) - - -def test_no_options(): - # Create one stream with one profile so device init won't fail - # No device options, no stream options - s1p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 ) - s1profiles = [s1p1] - s1 = dds.depth_stream_server( "s1", "sensor" ) - s1.init_profiles( s1profiles, 0 ) - dev_opts = [] - global server - server = dds.device_server( participant, "realdds/device/topic-root" ) - server.init( [s1], dev_opts, {} ) - -def test_device_options_discovery( values ): - # Create one stream with one profile so device init won't fail, no stream options - s1p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 ) - s1profiles = [s1p1] - s1 = dds.depth_stream_server( "s1", "sensor" ) - s1.init_profiles( s1profiles, 0 ) - dev_opts = [] - for index, value in enumerate( values ): - option = dds.option( f'opt{index}', dds.option_range( value, value, 0, value ), f'opt{index} description' ) - dev_opts.append( option ) - global server - server = dds.device_server( participant, "realdds/device/topic-root" ) - server.init( [s1], dev_opts, {}) - -def test_stream_options_discovery( value, min, max, step, default, description ): - s1p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 ) - s1profiles = [s1p1] - s1 = dds.depth_stream_server( "s1", "sensor" ) - s1.init_profiles( s1profiles, 0 ) - so1 = dds.option( "opt1", dds.option_range( value, value, 0, value ), "opt1 is const" ) - so1.set_value( value ) - so2 = dds.option( "opt2", dds.option_range( min, max, step, default ), "opt2 with range" ) - so3 = dds.option( "opt3", dds.option_range( 0, 1, 0.05, 0.15 ), description ) - s1.init_options( [so1, so2, so3] ) - global server - server = dds.device_server( participant, "realdds/device/topic-root" ) - server.init( [s1], [], {} ) - -def test_device_and_multiple_stream_options_discovery( dev_values, stream_values ): - dev_options = [] - for index, value in enumerate( dev_values ): - option = dds.option( f'opt{index}', dds.option_range( value, value, 0, value ), f'opt{index} description' ) - dev_options.append( option ) - - s1p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 ) - s1profiles = [s1p1] - s1 = dds.depth_stream_server( "s1", "sensor" ) - s1.init_profiles( s1profiles, 0 ) - stream_options = [] - for index, value in enumerate( stream_values ): - option = dds.option( f'opt{index}', dds.option_range( value, value, 0, value ), f'opt{index} description' ) - stream_options.append( option ) - s1.init_options( stream_options ) - - s2p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 ) - s2profiles = [s2p1] - s2 = dds.depth_stream_server( "s2", "sensor" ) - s2.init_profiles( s2profiles, 0 ) - stream_options = [] - for index, value in enumerate( stream_values ): - option = dds.option( f'opt{index}', dds.option_range( value, value, 0, value ), f'opt{index} description' ) - stream_options.append( option ) - s2.init_options( stream_options ) - - global server - server = dds.device_server( participant, "realdds/device/topic-root" ) - server.init( [s1, s2], dev_options, {} ) - -def close_server(): - global server - server = None - - -# From here down, we're in "interactive" mode (see test-device-init.py) -# ... diff --git a/unit-tests/dds/test-control-reply.py b/unit-tests/dds/test-control-reply.py index acbeffc56a..7de2cf9506 100644 --- a/unit-tests/dds/test-control-reply.py +++ b/unit-tests/dds/test-control-reply.py @@ -2,7 +2,7 @@ # Copyright(c) 2023 Intel Corporation. All Rights Reserved. #test:donotrun:!dds -#test:retries 2 +#test:retries:gha 2 from rspy import log, test import pyrealdds as dds @@ -81,8 +81,7 @@ def _on_notification( device, id, notification ): reply_count[device.guid()] += 1 else: log.d( f'notification to {device}' ) - return True # otherwise the notification will be flagged as unhandled - device.on_notification( _on_notification ) + notification_subscription = device.on_notification( _on_notification ) with test.closure( 'Send a notification that is not a reply' ): dev1_notifications = notification_count[device.guid()] @@ -117,7 +116,7 @@ def control( device, json, n=1 ): device2 = dds.device( participant, device_info ) notification_count[device2.guid()] = 0 reply_count[device2.guid()] = 0 - device2.on_notification( _on_notification ) + notification2_subscription = device2.on_notification( _on_notification ) device2.wait_until_ready() with test.closure( 'Controls generate notifications to all devices' ): diff --git a/unit-tests/dds/test-librs-connections.py b/unit-tests/dds/test-librs-connections.py index 95aa85c150..83e475c906 100644 --- a/unit-tests/dds/test-librs-connections.py +++ b/unit-tests/dds/test-librs-connections.py @@ -2,7 +2,7 @@ # Copyright(c) 2022 Intel Corporation. All Rights Reserved. #test:donotrun:!dds -#test:retries 2 +#test:retries:gha 2 from rspy import log, test log.nested = 'C ' @@ -30,7 +30,7 @@ remote.run( 'instance2 = broadcast_device( d405, d405.device_info )' ) # Create context after remote device is ready to test discovery on start-up - context = rs.context( { 'dds': { 'domain': 123, 'participant': 'librs' }} ) + context = rs.context( { 'dds': { 'enabled': True, 'domain': 123, 'participant': 'librs' }} ) # The DDS devices take time to be recognized and we just created the context; we # should not see them yet! #test.check( len( context.query_devices( only_sw_devices )) != 2 ) diff --git a/unit-tests/dds/test-librs-context.py b/unit-tests/dds/test-librs-context.py index 753fe30450..1ca3bdfdd3 100644 --- a/unit-tests/dds/test-librs-context.py +++ b/unit-tests/dds/test-librs-context.py @@ -16,14 +16,14 @@ test.start( "Multiple participants on the same domain should fail" ) try: contexts = [] - contexts.append( rs.context( { 'dds': { 'domain': 124, 'participant': 'context1' }} )) + contexts.append( rs.context( { 'dds': { 'enabled': True, 'domain': 124, 'participant': 'context1' }} )) # another context, same domain and name -> OK - contexts.append( rs.context( { 'dds': { 'domain': 124, 'participant': 'context1' }} )) + contexts.append( rs.context( { 'dds': { 'enabled': True, 'domain': 124, 'participant': 'context1' }} )) # without a name -> pick up the name from the existing participant (default is "librealsense") - contexts.append( rs.context( { 'dds': { 'domain': 124 }} )) + contexts.append( rs.context( { 'dds': { 'enabled': True, 'domain': 124 }} )) # same name, different domain -> different participant; should be OK: - contexts.append( rs.context( { 'dds': { 'domain': 125, 'participant': 'context1' }} )) - test.check_throws( lambda: rs.context( { 'dds': { 'domain': 124, 'participant': 'context2' }} ), + contexts.append( rs.context( { 'dds': { 'enabled': True, 'domain': 125, 'participant': 'context1' }} )) + test.check_throws( lambda: rs.context( { 'dds': { 'enabled': True, 'domain': 124, 'participant': 'context2' }} ), RuntimeError, "A DDS participant 'context1' already exists in domain 124; cannot create 'context2'" ) except: test.unexpected_exception() diff --git a/unit-tests/dds/test-librs-device-properties.py b/unit-tests/dds/test-librs-device-properties.py index 23f22f922a..911de72867 100644 --- a/unit-tests/dds/test-librs-device-properties.py +++ b/unit-tests/dds/test-librs-device-properties.py @@ -17,7 +17,7 @@ rs.log_to_console( rs.log_severity.debug ) from time import sleep -context = rs.context( { 'dds': { 'domain': 123, 'participant': 'device-properties-client' }} ) +context = rs.context( { 'dds': { 'enabled': True, 'domain': 123, 'participant': 'device-properties-client' }} ) only_sw_devices = int(rs.product_line.sw_only) | int(rs.product_line.any_intel) diff --git a/unit-tests/dds/test-librs-extrinsics.py b/unit-tests/dds/test-librs-extrinsics.py index e95b7bd095..12a99e16b7 100644 --- a/unit-tests/dds/test-librs-extrinsics.py +++ b/unit-tests/dds/test-librs-extrinsics.py @@ -2,6 +2,7 @@ # Copyright(c) 2022 Intel Corporation. All Rights Reserved. #test:donotrun:!dds +#test:retries:gha 2 from rspy import log, test log.nested = 'C ' @@ -12,7 +13,7 @@ rs.log_to_console( rs.log_severity.debug ) from time import sleep -context = rs.context( { 'dds': { 'domain': 123 }} ) +context = rs.context( { 'dds': { 'enabled': True, 'domain': 123 }} ) only_sw_devices = int(rs.product_line.sw_only) | int(rs.product_line.any_intel) import os.path diff --git a/unit-tests/dds/test-librs-formats-conversion.py b/unit-tests/dds/test-librs-formats-conversion.py index b582bdc1e9..0e57086e5a 100644 --- a/unit-tests/dds/test-librs-formats-conversion.py +++ b/unit-tests/dds/test-librs-formats-conversion.py @@ -12,7 +12,7 @@ rs.log_to_console( rs.log_severity.debug ) log.nested = 'C ' -context = rs.context( { 'dds': { 'domain': 123, 'participant': 'test-formats-conversion' }} ) +context = rs.context( { 'dds': { 'enabled': True, 'domain': 123, 'participant': 'test-formats-conversion' }} ) only_sw_devices = int(rs.product_line.sw_only) | int(rs.product_line.any_intel) import os.path diff --git a/unit-tests/dds/test-librs-intrinsics.py b/unit-tests/dds/test-librs-intrinsics.py index 0847f3fa85..e1bb00e87f 100644 --- a/unit-tests/dds/test-librs-intrinsics.py +++ b/unit-tests/dds/test-librs-intrinsics.py @@ -2,6 +2,7 @@ # Copyright(c) 2022 Intel Corporation. All Rights Reserved. #test:donotrun:!dds +#test:retries:gha 2 from rspy import log, test log.nested = 'C ' @@ -12,7 +13,7 @@ rs.log_to_console( rs.log_severity.debug ) from time import sleep -context = rs.context( { 'dds': { 'domain': 123 }} ) +context = rs.context( { 'dds': { 'enabled': True, 'domain': 123 }} ) only_sw_devices = int(rs.product_line.sw_only) | int(rs.product_line.any_intel) import os.path diff --git a/unit-tests/dds/test-librs-options.py b/unit-tests/dds/test-librs-options.py index c73328fc66..c0f782855f 100644 --- a/unit-tests/dds/test-librs-options.py +++ b/unit-tests/dds/test-librs-options.py @@ -44,7 +44,7 @@ from dds import wait_for_devices with test.closure( 'Initialize librealsense context', on_fail=test.ABORT ): - context = rs.context( { 'dds': { 'domain': 123, 'participant': 'client' }} ) + context = rs.context( { 'dds': { 'enabled': True, 'domain': 123, 'participant': 'client' }} ) only_sw_devices = int(rs.product_line.sw_only) | int(rs.product_line.any) with test.closure( 'Find the server', on_fail=test.ABORT ): diff --git a/unit-tests/dds/test-metadata.py b/unit-tests/dds/test-metadata.py index 14796829d7..73909bd3ee 100644 --- a/unit-tests/dds/test-metadata.py +++ b/unit-tests/dds/test-metadata.py @@ -89,7 +89,7 @@ def on_metadata_available( device, md ): metadata_content.append( md ) metadata_received.set() - device_direct.on_metadata_available( on_metadata_available ) + metadata_subscription = device_direct.on_metadata_available( on_metadata_available ) def detect_metadata(): metadata_content.clear() @@ -141,7 +141,7 @@ def __exit__( self, type, value, traceback ): if log.is_debug_on(): rs.log_to_console( rs.log_severity.debug ) from dds import wait_for_devices - context = rs.context( { 'dds': { 'domain': 123, 'participant': 'librs' }} ) + context = rs.context( { 'dds': { 'enabled': True, 'domain': 123, 'participant': 'librs' }} ) only_sw_devices = int(rs.product_line.sw_only) | int(rs.product_line.any_intel) device = wait_for_devices( context, only_sw_devices, n=1. ) sensors = device.sensors diff --git a/unit-tests/dds/test-no-metadata.py b/unit-tests/dds/test-no-metadata.py index 5418823fe1..6279a1be58 100644 --- a/unit-tests/dds/test-no-metadata.py +++ b/unit-tests/dds/test-no-metadata.py @@ -6,7 +6,6 @@ import pyrealdds as dds from rspy import log, test dds.debug( log.is_debug_on() ) -from time import sleep participant = dds.participant() participant.init( 123, "test-no-metadata" ) @@ -29,16 +28,16 @@ def on_metadata_available( device, md ): log.d( f'-----> {md}') -device.on_metadata_available( on_metadata_available ) +metadata_subscription = device.on_metadata_available( on_metadata_available ) + ############################################################################################# -# -test.start( "publish_metadata should be impossible" ) -md = { 'stream-name' : 'Color', 'invalid-metadata' : True } -test.check_throws( lambda: - device_server.publish_metadata( md ), - RuntimeError, "device 'realsense/D435I_036522070660' has no stream with enabled metadata" ) -test.finish() -# +with test.closure( "publish_metadata should be impossible" ): + md = { 'stream-name' : 'Color', 'invalid-metadata' : True } + test.check_throws( lambda: + device_server.publish_metadata( md ), + RuntimeError, "device 'realsense/D435I_036522070660' has no stream with enabled metadata" ) + + ############################################################################################# test.print_results_and_exit() diff --git a/unit-tests/dds/test-options.py b/unit-tests/dds/test-options.py index b01c62ecb4..57a35aa838 100644 --- a/unit-tests/dds/test-options.py +++ b/unit-tests/dds/test-options.py @@ -2,31 +2,111 @@ # Copyright(c) 2022 Intel Corporation. All Rights Reserved. #test:donotrun:!dds +#test:retries:gha 2 import pyrealdds as dds from rspy import log, test - -dds.debug( log.is_debug_on(), 'C ' ) -log.nested = 'C ' - - -participant = dds.participant() -participant.init( 123, "test-options" ) - info = dds.message.device_info() info.name = "Test Device" -info.topic_root = "realdds/device/topic-root" +info.topic_root = "realsense/options-device" + +with test.remote.fork( nested_indent=None ) as remote: + if remote is None: # we're the fork + + dds.debug( log.is_debug_on(), log.nested ) + + participant = dds.participant() + participant.init( 123, "server" ) + + + def test_no_options(): + # Create one stream with one profile so device init won't fail + # No device options, no stream options + s1p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 ) + s1profiles = [s1p1] + s1 = dds.depth_stream_server( "s1", "sensor" ) + s1.init_profiles( s1profiles, 0 ) + dev_opts = [] + global server + server = dds.device_server( participant, info.topic_root ) + server.init( [s1], dev_opts, {} ) + + def test_device_options_discovery( values ): + # Create one stream with one profile so device init won't fail, no stream options + s1p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 ) + s1profiles = [s1p1] + s1 = dds.depth_stream_server( "s1", "sensor" ) + s1.init_profiles( s1profiles, 0 ) + dev_opts = [] + for index, value in enumerate( values ): + option = dds.option( f'opt{index}', dds.option_range( value, value, 0, value ), f'opt{index} description' ) + dev_opts.append( option ) + global server + server = dds.device_server( participant, info.topic_root ) + server.init( [s1], dev_opts, {}) + + def test_stream_options_discovery( value, min, max, step, default, description ): + s1p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 ) + s1profiles = [s1p1] + s1 = dds.depth_stream_server( "s1", "sensor" ) + s1.init_profiles( s1profiles, 0 ) + so1 = dds.option( "opt1", dds.option_range( value, value, 0, value ), "opt1 is const" ) + so1.set_value( value ) + so2 = dds.option( "opt2", dds.option_range( min, max, step, default ), "opt2 with range" ) + so3 = dds.option( "opt3", dds.option_range( 0, 1, 0.05, 0.15 ), description ) + s1.init_options( [so1, so2, so3] ) + global server + server = dds.device_server( participant, info.topic_root ) + server.init( [s1], [], {} ) + + def test_device_and_multiple_stream_options_discovery( dev_values, stream_values ): + dev_options = [] + for index, value in enumerate( dev_values ): + option = dds.option( f'opt{index}', dds.option_range( value, value, 0, value ), f'opt{index} description' ) + dev_options.append( option ) + + s1p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 ) + s1profiles = [s1p1] + s1 = dds.depth_stream_server( "s1", "sensor" ) + s1.init_profiles( s1profiles, 0 ) + stream_options = [] + for index, value in enumerate( stream_values ): + option = dds.option( f'opt{index}', dds.option_range( value, value, 0, value ), f'opt{index} description' ) + stream_options.append( option ) + s1.init_options( stream_options ) + + s2p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 ) + s2profiles = [s2p1] + s2 = dds.depth_stream_server( "s2", "sensor" ) + s2.init_profiles( s2profiles, 0 ) + stream_options = [] + for index, value in enumerate( stream_values ): + option = dds.option( f'opt{index}', dds.option_range( value, value, 0, value ), f'opt{index} description' ) + stream_options.append( option ) + s2.init_options( stream_options ) + + global server + server = dds.device_server( participant, info.topic_root ) + server.init( [s1, s2], dev_options, {} ) + + def close_server(): + global server + server = None + + raise StopIteration() # exit the 'with' statement + + + ############################################################################################################### + # + dds.debug( log.is_debug_on(), 'C ' ) + log.nested = 'C ' + + participant = dds.participant() + participant.init( 123, "test-options" ) -import os.path -cwd = os.path.dirname(os.path.realpath(__file__)) -remote_script = os.path.join( cwd, 'options-server.py' ) -with test.remote( remote_script, nested_indent=" S" ) as remote: - remote.wait_until_ready() - # ############################################################################################# - # with test.closure( "Test no options" ): remote.run( 'test_no_options()' ) device = dds.device( participant, info ) @@ -44,9 +124,9 @@ remote.run( 'close_server()' ) device = None - # + + ############################################################################################# - # with test.closure( "Test device options discovery" ): test_values = list(range(17)) remote.run( 'test_device_options_discovery(' + str( test_values ) + ')' ) @@ -67,9 +147,9 @@ remote.run( 'close_server()' ) device = None - # + + ############################################################################################# - # with test.closure( "Test stream options discovery" ): #send values to be checked later as string parameter to the function remote.run( 'test_stream_options_discovery(1, 0, 123456, 123, 12, "opt3 of s1")' ) @@ -98,9 +178,9 @@ remote.run( 'close_server()' ) device = None - # + + ############################################################################################# - # with test.closure( "Test device and multiple stream options discovery" ): test_values = list(range(5)) remote.run( 'test_device_and_multiple_stream_options_discovery(' + str( test_values ) + ', ' + str( test_values ) + ')' ) @@ -120,9 +200,8 @@ remote.run( 'close_server()' ) device = None - # - ############################################################################################# + participant = None -participant = None -test.print_results_and_exit() +############################################################################################# +test.print_results() diff --git a/unit-tests/dds/test-query-option.py b/unit-tests/dds/test-query-option.py new file mode 100644 index 0000000000..616bb5af3f --- /dev/null +++ b/unit-tests/dds/test-query-option.py @@ -0,0 +1,97 @@ +# License: Apache 2.0. See LICENSE file in root directory. +# Copyright(c) 2023 Intel Corporation. All Rights Reserved. + +#test:donotrun:!dds +#test:retries:gha 2 + +from rspy import log, test +import pyrealdds as dds +dds.debug( log.is_debug_on() ) + +device_info = dds.message.device_info() +device_info.topic_root = 'server/device' + +with test.remote.fork( nested_indent=None ) as remote: + if remote is None: # we're the fork + + with test.closure( 'Start the server participant' ): + participant = dds.participant() + participant.init( 123, 'server' ) + + with test.closure( 'Create the server' ): + device_info.name = 'Some device' + s1p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 ) + s1profiles = [s1p1] + s1 = dds.color_stream_server( 's1', 'sensor' ) + s1.init_profiles( s1profiles, 0 ) + s1.init_options( [ + dds.option( 'Backlight Compensation', dds.option_range( 0, 1, 1, 0 ), 'Backlight custom description' ), + dds.option( 'Custom Option', dds.option_range( 0, 1, 0.1, 0.5 ), 'Something' ), + dds.option( 'Option 3', dds.option_range( 0, 50, 1, 25 ), 'Something Else' ) + ] ) + server = dds.device_server( participant, device_info.topic_root ) + server.init( [s1], [], {} ) + + raise StopIteration() # exit the 'with' statement + + + ############################################################################################################### + # The client is a device from which we send controls + # + from dds import wait_for_devices + + with test.closure( 'Start the client participant' ): + participant = dds.participant() + participant.init( 123, 'client' ) + + with test.closure( 'Wait for the device' ): + device_info.name = 'Device1' + device = dds.device( participant, device_info ) + device.wait_until_ready() + + with test.closure( 'Query single option, option-name: "Option 3"' ): + reply = device.send_control( { + 'id': 'query-option', + 'stream-name': 's1', + 'option-name': 'Option 3' + }, True ) # Wait for reply + test.info( 'reply', reply ) + test.check_equal( reply.get( 'value' ), 25. ) + + with test.closure( 'Option names are case-sensitive' ): + test.check_throws( lambda: + device.send_control( { + 'id': 'query-option', + 'stream-name': 's1', + 'option-name': 'custom option' + }, True ), # Wait for reply + RuntimeError, + '["query-option" error] \'s1\' option \'custom option\' not found' ) + + with test.closure( 'Query all options, option-name:[]' ): + reply = device.send_control( { + 'id': 'query-option', + 'stream-name': 's1', + 'option-name': [] # get all options + }, True ) # Wait for reply + test.info( 'reply', reply ) + values = reply.get( 'option-values' ) + if test.check( values ): + test.check_equal( len(values), 3 ) + test.check_equal( type(values), dict ) + + with test.closure( 'Query multiple options, option-name:["Option 3","Custom Option"]' ): + reply = device.send_control( { + 'id': 'query-option', + 'stream-name': 's1', + 'option-name': ['Option 3', 'Custom Option'] + }, True ) # Wait for reply + test.info( 'reply', reply ) + values = reply.get( 'value' ) + if test.check( values ): + test.check_equal( len(values), 2 ) + test.check_equal( type(values), list ) + + device = None + +test.print_results()