-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming tests using flexible-msg #11102
Changes from all commits
92cb4f3
0de74e8
18f81e6
207d348
704d531
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ Copyright(c) 2022 Intel Corporation. All Rights Reserved. */ | |
#include <realdds/dds-participant.h> | ||
#include <realdds/topics/dds-topics.h> | ||
#include <realdds/topics/device-info/deviceInfoPubSubTypes.h> | ||
#include <realdds/topics/flexible/flexiblePubSubTypes.h> | ||
#include <realdds/dds-device-broadcaster.h> | ||
#include <realdds/dds-device-server.h> | ||
#include <realdds/dds-stream-server.h> | ||
|
@@ -16,13 +17,15 @@ Copyright(c) 2022 Intel Corporation. All Rights Reserved. */ | |
#include <realdds/dds-guid.h> | ||
#include <realdds/dds-topic.h> | ||
#include <realdds/dds-topic-reader.h> | ||
#include <realdds/dds-topic-writer.h> | ||
#include <realdds/dds-utilities.h> | ||
#include <realdds/dds-log-consumer.h> | ||
|
||
#include <librealsense2/utilities/easylogging/easyloggingpp.h> | ||
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp> | ||
#include <fastdds/dds/domain/DomainParticipant.hpp> | ||
#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp> | ||
#include <fastdds/dds/core/status/PublicationMatchedStatus.hpp> | ||
#include <fastrtps/types/DynamicType.h> | ||
|
||
#include <third-party/json.hpp> | ||
|
@@ -218,15 +221,26 @@ PYBIND11_MODULE(NAME, m) { | |
eprosima::fastdds::dds::TypeSupport const &, | ||
char const * >() ) | ||
.def( "get_participant", &dds_topic::get_participant ) | ||
.def( "name", []( dds_topic const & self ) { return self->get_name(); } ) | ||
.def( "__repr__", []( dds_topic const & self ) { | ||
std::ostringstream os; | ||
os << "<" SNAME ".topic \"" << self->get_name() << "\""; | ||
os << ">"; | ||
return os.str(); | ||
} ); | ||
|
||
using durability = eprosima::fastdds::dds::DurabilityQosPolicyKind; | ||
py::enum_< durability >( m, "durability" ) | ||
.value( "volatile", eprosima::fastdds::dds::DurabilityQosPolicyKind::VOLATILE_DURABILITY_QOS ) | ||
.value( "transient_local", eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS ) | ||
.value( "transient", eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_DURABILITY_QOS ); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are not using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not about whether it's being used or not; it's about whether it's available. |
||
using reliability = eprosima::fastdds::dds::ReliabilityQosPolicyKind; | ||
py::enum_< reliability >( m, "reliability" ) | ||
.value( "reliable", eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS ) | ||
.value( "best_effort", eprosima::fastdds::dds::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS ); | ||
|
||
using reader_qos = realdds::dds_topic_reader::qos; | ||
py::class_< reader_qos >( m, "reader_qos" ) | ||
py::class_< reader_qos >( m, "reader_qos" ) // | ||
.def( "__repr__", []( reader_qos const & self ) { | ||
std::ostringstream os; | ||
os << "<" SNAME ".reader_qos"; | ||
|
@@ -237,14 +251,38 @@ PYBIND11_MODULE(NAME, m) { | |
using realdds::dds_topic_reader; | ||
py::class_< dds_topic_reader, std::shared_ptr< dds_topic_reader > >( m, "topic_reader" ) | ||
.def( py::init< std::shared_ptr< dds_topic > const & >() ) | ||
.def( FN_FWD( dds_topic_reader, on_data_available, (), (), callback(); ) ) | ||
.def( FN_FWD( dds_topic_reader, on_data_available, (dds_topic_reader &), (), callback( self ); ) ) | ||
.def( FN_FWD( dds_topic_reader, | ||
on_subscription_matched, | ||
(), | ||
(eprosima::fastdds::dds::SubscriptionMatchedStatus const &), | ||
callback(); ) ) | ||
.def( "topic", &dds_topic_reader::topic ) | ||
.def( "run", &dds_topic_reader::run ) | ||
.def( "qos", []() { return reader_qos(); } ); | ||
.def( "qos", []() { return reader_qos(); } ) | ||
.def( "qos", []( reliability r, durability d ) { return reader_qos( r, d ); } ); | ||
|
||
using writer_qos = realdds::dds_topic_writer::qos; | ||
py::class_< writer_qos >( m, "writer_qos" ) // | ||
.def( "__repr__", []( writer_qos const & self ) { | ||
std::ostringstream os; | ||
os << "<" SNAME ".writer_qos"; | ||
os << ">"; | ||
return os.str(); | ||
} ); | ||
|
||
using realdds::dds_topic_writer; | ||
py::class_< dds_topic_writer, std::shared_ptr< dds_topic_writer > >( m, "topic_writer" ) | ||
.def( py::init< std::shared_ptr< dds_topic > const & >() ) | ||
.def( FN_FWD( dds_topic_writer, | ||
on_publication_matched, | ||
(int), | ||
( eprosima::fastdds::dds::PublicationMatchedStatus const & status ), | ||
callback( status.total_count_change ); ) ) | ||
.def( "topic", &dds_topic_writer::topic ) | ||
.def( "run", &dds_topic_writer::run ) | ||
.def( "qos", []() { return writer_qos(); } ) | ||
.def( "qos", []( reliability r, durability d ) { return writer_qos( r, d ); } ); | ||
|
||
|
||
// The actual types are declared as functions and not classes: the py::init<> inheritance rules are pretty strict | ||
|
@@ -298,19 +336,82 @@ PYBIND11_MODULE(NAME, m) { | |
.def( "create_topic", &device_info::create_topic ) | ||
.attr( "TOPIC_NAME" ) = device_info::TOPIC_NAME; | ||
|
||
//py::class_< raw_device_info, std::shared_ptr< raw_device_info > >( raw, "device_info" ) | ||
// .def( py::init<>() ) | ||
// .def( py::init( []( dds_topic_reader const & reader ) { | ||
// auto actual_type = reader.topic()->get()->get_type_name(); | ||
// if( actual_type != "realdds::topics::raw::device_info" ) | ||
// throw std::runtime_error( "can't initialize raw::device_info from " + actual_type ); | ||
// raw_device_info raw_data; | ||
// eprosima::fastdds::dds::SampleInfo info; | ||
// DDS_API_CALL( reader->take_next_sample( &raw_data, &info ) ); | ||
// if( ! info.valid_data ) | ||
// throw std::runtime_error( "invalid data" ); | ||
// return raw_data; | ||
// } ) ); | ||
using realdds::topics::flexible_msg; | ||
using raw_flexible = realdds::topics::raw::flexible; | ||
types.def( "flexible", []() { return TypeSupport( new flexible_msg::type ); } ); | ||
|
||
py::enum_< flexible_msg::data_format >( m, "flexible.data_format" ) | ||
.value( "json", flexible_msg::data_format::JSON ) | ||
.value( "cbor", flexible_msg::data_format::CBOR ) | ||
.value( "custom", flexible_msg::data_format::CUSTOM ); | ||
|
||
typedef std::shared_ptr< dds_topic > flexible_msg_create_topic( std::shared_ptr< dds_participant > const &, | ||
char const * ); | ||
py::class_< flexible_msg >( m, "flexible_msg" ) | ||
.def( py::init<>() ) | ||
.def( py::init( []( std::string const & json_string ) { | ||
return flexible_msg( flexible_msg::data_format::JSON, nlohmann::json::parse( json_string ) ); | ||
} ) ) | ||
.def_readwrite( "data_format", &flexible_msg::_data_format ) | ||
.def_readwrite( "version", &flexible_msg::_version ) | ||
.def_readwrite( "data", &flexible_msg::_data ) | ||
.def( "invalidate", &flexible_msg::invalidate ) | ||
.def( "is_valid", &flexible_msg::is_valid ) | ||
.def( "__nonzero__", &flexible_msg::is_valid ) // Called to implement truth value testing in Python 2 | ||
.def( "__bool__", &flexible_msg::is_valid ) // Called to implement truth value testing in Python 3 | ||
.def( "__repr__", | ||
[]( flexible_msg const & self ) { | ||
std::ostringstream os; | ||
os << "<" SNAME ".flexible_msg "; | ||
switch( self._data_format ) | ||
{ | ||
case flexible_msg::data_format::JSON: | ||
os << "JSON"; | ||
break; | ||
case flexible_msg::data_format::CBOR: | ||
os << "CBOR"; | ||
break; | ||
case flexible_msg::data_format::CUSTOM: | ||
os << "CUSTOM"; | ||
break; | ||
default: | ||
os << "UNKNOWN(" << (int)self._data_format << ")"; | ||
break; | ||
} | ||
os << ' '; | ||
if( ! self.is_valid() ) | ||
os << "INVALID"; | ||
else | ||
{ | ||
os << self._data.size(); | ||
switch( self._data_format ) | ||
{ | ||
case flexible_msg::data_format::JSON: | ||
os << ' '; | ||
os << std::string( (char const *)self._data.data(), self._data.size() ); | ||
break; | ||
default: | ||
break; | ||
} | ||
} | ||
os << ">"; | ||
return os.str(); | ||
} ) | ||
.def( "take_next", | ||
[]( dds_topic_reader & reader ) { | ||
auto actual_type = reader.topic()->get()->get_type_name(); | ||
if( actual_type != flexible_msg::type().getName() ) | ||
throw std::runtime_error( "can't initialize raw::flexible from " + actual_type ); | ||
flexible_msg data; | ||
if( ! flexible_msg::take_next( reader, &data ) ) | ||
assert( ! data.is_valid() ); | ||
return data; | ||
} ) | ||
.def( "create_topic", static_cast<flexible_msg_create_topic *>( &flexible_msg::create_topic )) | ||
.def( "json_data", []( flexible_msg const & self ) { | ||
return std::string( (char const *)self._data.data(), self._data.size() ); | ||
} ) | ||
.def( "write_to", &flexible_msg::write_to ); | ||
|
||
|
||
using realdds::dds_device_broadcaster; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
# License: Apache 2.0. See LICENSE file in root directory. | ||
# Copyright(c) 2022 Intel Corporation. All Rights Reserved. | ||
|
||
import pyrealdds as dds | ||
from rspy import log, test | ||
import json | ||
|
||
dds.debug( True, log.nested ) | ||
|
||
|
||
participant = dds.participant() | ||
participant.init( 123, "stream-server" ) | ||
|
||
|
||
|
||
class Topic: | ||
""" | ||
Just to enable simple one-line syntax: | ||
Topic( "blah" ).write( '{"field" : 1}' ) | ||
""" | ||
|
||
def __init__( self, name, qos = None ): | ||
if type(name) is str: | ||
self.handle = dds.flexible_msg.create_topic( participant, name ) | ||
elif type(name) is dds.topic: | ||
self.handle = name | ||
else: | ||
raise RuntimeError( "invalid 'name' argument: " + type(name) ) | ||
self.writer = dds.topic_writer( self.handle ) | ||
self.writer.run( qos or dds.topic_writer.qos( dds.reliability.reliable, dds.durability.transient_local ) ) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a left-over from my experiments trying to make it work with the single line. I don't think it's needed any more, but for now harmless. I'll remove later. |
||
|
||
def write( self, json_string ): | ||
dds.flexible_msg( json_string ).write_to( self.writer ) | ||
|
||
|
||
|
||
# From here down, we're in "interactive" mode (see test-device-init.py) | ||
# ... |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
# License: Apache 2.0. See LICENSE file in root directory. | ||
# Copyright(c) 2022 Intel Corporation. All Rights Reserved. | ||
|
||
#test:donotrun:!dds | ||
|
||
import pyrealdds as dds | ||
from rspy import log, test | ||
log.nested = 'C ' | ||
|
||
from time import sleep | ||
import json | ||
|
||
participant = dds.participant() | ||
participant.init( 123, "test-streaming" ) | ||
|
||
|
||
data = {} | ||
def on_data_available( reader ): | ||
global data | ||
topic_name = reader.topic().name() | ||
if topic_name not in data: | ||
data[topic_name] = [] | ||
msg = dds.flexible_msg.take_next( reader ) | ||
if not msg: | ||
log.e( "expected message not received!" ) | ||
else: | ||
log.d( "received:", msg ) | ||
data[topic_name] += [msg] | ||
|
||
def wait_for_data( timeout = 3.0 ): | ||
global data | ||
while not data: | ||
timeout -= 0.25 | ||
if timeout > 0: | ||
sleep( 0.25 ) | ||
else: | ||
raise RuntimeError( "timed out waiting for data" ) | ||
|
||
|
||
sample1 = { 'id' : 'test-message', 'content' : { 'int' : 1, 'float' : 2.0, 'array' : [ 1, 2.0, 'hello' ] } } | ||
sample1_json = json.dumps( sample1 ) | ||
|
||
|
||
import os.path | ||
cwd = os.path.dirname(os.path.realpath(__file__)) | ||
remote_script = os.path.join( cwd, 'streaming-server.py' ) | ||
with test.remote( remote_script, nested_indent=" S" ) as remote: | ||
remote.wait_until_ready() | ||
# | ||
############################################################################################# | ||
# | ||
test.start( "Test 1..." ) | ||
try: | ||
remote.run( 'topic = Topic( "blah" )', timeout=5 ) | ||
# NOTE: technically, we could combine everything into a single command: | ||
# Topic("blah").write( ... ) | ||
# and it would do everything, including destroy the Topic instance with the writer etc. | ||
# This doesn't work! If the writer is destroyed before we've had a chance to get the message, | ||
# the message will get lost... | ||
topic = dds.flexible_msg.create_topic( participant, "blah" ) | ||
reader = dds.topic_reader( topic ) | ||
reader.on_data_available( on_data_available ) | ||
reader.run( dds.topic_reader.qos() ) | ||
assert( not data ) | ||
remote.run( 'topic.write( """' + sample1_json + '""" )', timeout=5 ) | ||
wait_for_data() | ||
if test.check_equal( len(data), 1 ): | ||
if test.check_equal( next( iter( data )), 'blah' ): | ||
test.check_equal( len( data['blah'] ), 1 ) | ||
test.check_equal( json.loads( data['blah'][0].json_data() ), sample1 ) | ||
except: | ||
test.unexpected_exception() | ||
remote.run( 'topic = None', timeout=5 ) | ||
data = [] | ||
test.finish() | ||
# | ||
############################################################################################# | ||
|
||
|
||
test.print_results_and_exit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CBOR is supported now, maybe "Custom data is not supported yet"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it's a function called
json_data()
and if we add other formats, they won't be supported either...