Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming tests using flexible-msg #11102

Merged
merged 5 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions third-party/realdds/src/topics/flexible-msg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,17 @@ flexible_msg::take_next( dds_topic_reader & reader, flexible_msg * output, epros

json flexible_msg::json_data() const
{
if( _data_format != data_format::JSON )
DDS_THROW( runtime_error, "non-json flexible data is still unsupported" );
char const * begin = (char const *)_data.data();
char const * end = begin + _data.size();
return json::parse( begin, end );
if( _data_format == data_format::JSON )
{
char const * begin = (char const *)_data.data();
char const * end = begin + _data.size();
return json::parse( begin, end );
}
if( _data_format == data_format::CBOR )
{
return json::from_cbor( _data.begin(), _data.end() );
}
DDS_THROW( runtime_error, "non-json flexible data is still unsupported" );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CBOR is supported now, maybe "Custom data is not supported yet"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's a function called json_data() and if we add other formats, they won't be supported either...

}


Expand Down
133 changes: 117 additions & 16 deletions tools/dds/realdds-py/python.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Copyright(c) 2022 Intel Corporation. All Rights Reserved. */
#include <realdds/dds-participant.h>
#include <realdds/topics/dds-topics.h>
#include <realdds/topics/device-info/deviceInfoPubSubTypes.h>
#include <realdds/topics/flexible/flexiblePubSubTypes.h>
#include <realdds/dds-device-broadcaster.h>
#include <realdds/dds-device-server.h>
#include <realdds/dds-stream-server.h>
Expand All @@ -16,13 +17,15 @@ Copyright(c) 2022 Intel Corporation. All Rights Reserved. */
#include <realdds/dds-guid.h>
#include <realdds/dds-topic.h>
#include <realdds/dds-topic-reader.h>
#include <realdds/dds-topic-writer.h>
#include <realdds/dds-utilities.h>
#include <realdds/dds-log-consumer.h>

#include <librealsense2/utilities/easylogging/easyloggingpp.h>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
#include <fastdds/dds/core/status/PublicationMatchedStatus.hpp>
#include <fastrtps/types/DynamicType.h>

#include <third-party/json.hpp>
Expand Down Expand Up @@ -218,15 +221,26 @@ PYBIND11_MODULE(NAME, m) {
eprosima::fastdds::dds::TypeSupport const &,
char const * >() )
.def( "get_participant", &dds_topic::get_participant )
.def( "name", []( dds_topic const & self ) { return self->get_name(); } )
.def( "__repr__", []( dds_topic const & self ) {
std::ostringstream os;
os << "<" SNAME ".topic \"" << self->get_name() << "\"";
os << ">";
return os.str();
} );

using durability = eprosima::fastdds::dds::DurabilityQosPolicyKind;
py::enum_< durability >( m, "durability" )
.value( "volatile", eprosima::fastdds::dds::DurabilityQosPolicyKind::VOLATILE_DURABILITY_QOS )
.value( "transient_local", eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS )
.value( "transient", eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_DURABILITY_QOS );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not using TRANSIENT, if adding values we don't use there is also PERSISTENT durability

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about whether it's being used or not; it's about whether it's available. PERSISTENT is not supported, so I did not add.

using reliability = eprosima::fastdds::dds::ReliabilityQosPolicyKind;
py::enum_< reliability >( m, "reliability" )
.value( "reliable", eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS )
.value( "best_effort", eprosima::fastdds::dds::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS );

using reader_qos = realdds::dds_topic_reader::qos;
py::class_< reader_qos >( m, "reader_qos" )
py::class_< reader_qos >( m, "reader_qos" ) //
.def( "__repr__", []( reader_qos const & self ) {
std::ostringstream os;
os << "<" SNAME ".reader_qos";
Expand All @@ -237,14 +251,38 @@ PYBIND11_MODULE(NAME, m) {
using realdds::dds_topic_reader;
py::class_< dds_topic_reader, std::shared_ptr< dds_topic_reader > >( m, "topic_reader" )
.def( py::init< std::shared_ptr< dds_topic > const & >() )
.def( FN_FWD( dds_topic_reader, on_data_available, (), (), callback(); ) )
.def( FN_FWD( dds_topic_reader, on_data_available, (dds_topic_reader &), (), callback( self ); ) )
.def( FN_FWD( dds_topic_reader,
on_subscription_matched,
(),
(eprosima::fastdds::dds::SubscriptionMatchedStatus const &),
callback(); ) )
.def( "topic", &dds_topic_reader::topic )
.def( "run", &dds_topic_reader::run )
.def( "qos", []() { return reader_qos(); } );
.def( "qos", []() { return reader_qos(); } )
.def( "qos", []( reliability r, durability d ) { return reader_qos( r, d ); } );

using writer_qos = realdds::dds_topic_writer::qos;
py::class_< writer_qos >( m, "writer_qos" ) //
.def( "__repr__", []( writer_qos const & self ) {
std::ostringstream os;
os << "<" SNAME ".writer_qos";
os << ">";
return os.str();
} );

using realdds::dds_topic_writer;
py::class_< dds_topic_writer, std::shared_ptr< dds_topic_writer > >( m, "topic_writer" )
.def( py::init< std::shared_ptr< dds_topic > const & >() )
.def( FN_FWD( dds_topic_writer,
on_publication_matched,
(int),
( eprosima::fastdds::dds::PublicationMatchedStatus const & status ),
callback( status.total_count_change ); ) )
.def( "topic", &dds_topic_writer::topic )
.def( "run", &dds_topic_writer::run )
.def( "qos", []() { return writer_qos(); } )
.def( "qos", []( reliability r, durability d ) { return writer_qos( r, d ); } );


// The actual types are declared as functions and not classes: the py::init<> inheritance rules are pretty strict
Expand Down Expand Up @@ -298,19 +336,82 @@ PYBIND11_MODULE(NAME, m) {
.def( "create_topic", &device_info::create_topic )
.attr( "TOPIC_NAME" ) = device_info::TOPIC_NAME;

//py::class_< raw_device_info, std::shared_ptr< raw_device_info > >( raw, "device_info" )
// .def( py::init<>() )
// .def( py::init( []( dds_topic_reader const & reader ) {
// auto actual_type = reader.topic()->get()->get_type_name();
// if( actual_type != "realdds::topics::raw::device_info" )
// throw std::runtime_error( "can't initialize raw::device_info from " + actual_type );
// raw_device_info raw_data;
// eprosima::fastdds::dds::SampleInfo info;
// DDS_API_CALL( reader->take_next_sample( &raw_data, &info ) );
// if( ! info.valid_data )
// throw std::runtime_error( "invalid data" );
// return raw_data;
// } ) );
using realdds::topics::flexible_msg;
using raw_flexible = realdds::topics::raw::flexible;
types.def( "flexible", []() { return TypeSupport( new flexible_msg::type ); } );

py::enum_< flexible_msg::data_format >( m, "flexible.data_format" )
.value( "json", flexible_msg::data_format::JSON )
.value( "cbor", flexible_msg::data_format::CBOR )
.value( "custom", flexible_msg::data_format::CUSTOM );

typedef std::shared_ptr< dds_topic > flexible_msg_create_topic( std::shared_ptr< dds_participant > const &,
char const * );
py::class_< flexible_msg >( m, "flexible_msg" )
.def( py::init<>() )
.def( py::init( []( std::string const & json_string ) {
return flexible_msg( flexible_msg::data_format::JSON, nlohmann::json::parse( json_string ) );
} ) )
.def_readwrite( "data_format", &flexible_msg::_data_format )
.def_readwrite( "version", &flexible_msg::_version )
.def_readwrite( "data", &flexible_msg::_data )
.def( "invalidate", &flexible_msg::invalidate )
.def( "is_valid", &flexible_msg::is_valid )
.def( "__nonzero__", &flexible_msg::is_valid ) // Called to implement truth value testing in Python 2
.def( "__bool__", &flexible_msg::is_valid ) // Called to implement truth value testing in Python 3
.def( "__repr__",
[]( flexible_msg const & self ) {
std::ostringstream os;
os << "<" SNAME ".flexible_msg ";
switch( self._data_format )
{
case flexible_msg::data_format::JSON:
os << "JSON";
break;
case flexible_msg::data_format::CBOR:
os << "CBOR";
break;
case flexible_msg::data_format::CUSTOM:
os << "CUSTOM";
break;
default:
os << "UNKNOWN(" << (int)self._data_format << ")";
break;
}
os << ' ';
if( ! self.is_valid() )
os << "INVALID";
else
{
os << self._data.size();
switch( self._data_format )
{
case flexible_msg::data_format::JSON:
os << ' ';
os << std::string( (char const *)self._data.data(), self._data.size() );
break;
default:
break;
}
}
os << ">";
return os.str();
} )
.def( "take_next",
[]( dds_topic_reader & reader ) {
auto actual_type = reader.topic()->get()->get_type_name();
if( actual_type != flexible_msg::type().getName() )
throw std::runtime_error( "can't initialize raw::flexible from " + actual_type );
flexible_msg data;
if( ! flexible_msg::take_next( reader, &data ) )
assert( ! data.is_valid() );
return data;
} )
.def( "create_topic", static_cast<flexible_msg_create_topic *>( &flexible_msg::create_topic ))
.def( "json_data", []( flexible_msg const & self ) {
return std::string( (char const *)self._data.data(), self._data.size() );
} )
.def( "write_to", &flexible_msg::write_to );


using realdds::dds_device_broadcaster;
Expand Down
38 changes: 38 additions & 0 deletions unit-tests/dds/streaming-server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# License: Apache 2.0. See LICENSE file in root directory.
# Copyright(c) 2022 Intel Corporation. All Rights Reserved.

import pyrealdds as dds
from rspy import log, test
import json

dds.debug( True, log.nested )


participant = dds.participant()
participant.init( 123, "stream-server" )



class Topic:
"""
Just to enable simple one-line syntax:
Topic( "blah" ).write( '{"field" : 1}' )
"""

def __init__( self, name, qos = None ):
if type(name) is str:
self.handle = dds.flexible_msg.create_topic( participant, name )
elif type(name) is dds.topic:
self.handle = name
else:
raise RuntimeError( "invalid 'name' argument: " + type(name) )
self.writer = dds.topic_writer( self.handle )
self.writer.run( qos or dds.topic_writer.qos( dds.reliability.reliable, dds.durability.transient_local ) )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why transient_local?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a left-over from my experiments trying to make it work with the single line. I don't think it's needed any more, but for now harmless. I'll remove later.


def write( self, json_string ):
dds.flexible_msg( json_string ).write_to( self.writer )



# From here down, we're in "interactive" mode (see test-device-init.py)
# ...
80 changes: 80 additions & 0 deletions unit-tests/dds/test-streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# License: Apache 2.0. See LICENSE file in root directory.
# Copyright(c) 2022 Intel Corporation. All Rights Reserved.

#test:donotrun:!dds

import pyrealdds as dds
from rspy import log, test
log.nested = 'C '

from time import sleep
import json

participant = dds.participant()
participant.init( 123, "test-streaming" )


data = {}
def on_data_available( reader ):
global data
topic_name = reader.topic().name()
if topic_name not in data:
data[topic_name] = []
msg = dds.flexible_msg.take_next( reader )
if not msg:
log.e( "expected message not received!" )
else:
log.d( "received:", msg )
data[topic_name] += [msg]

def wait_for_data( timeout = 3.0 ):
global data
while not data:
timeout -= 0.25
if timeout > 0:
sleep( 0.25 )
else:
raise RuntimeError( "timed out waiting for data" )


sample1 = { 'id' : 'test-message', 'content' : { 'int' : 1, 'float' : 2.0, 'array' : [ 1, 2.0, 'hello' ] } }
sample1_json = json.dumps( sample1 )


import os.path
cwd = os.path.dirname(os.path.realpath(__file__))
remote_script = os.path.join( cwd, 'streaming-server.py' )
with test.remote( remote_script, nested_indent=" S" ) as remote:
remote.wait_until_ready()
#
#############################################################################################
#
test.start( "Test 1..." )
try:
remote.run( 'topic = Topic( "blah" )', timeout=5 )
# NOTE: technically, we could combine everything into a single command:
# Topic("blah").write( ... )
# and it would do everything, including destroy the Topic instance with the writer etc.
# This doesn't work! If the writer is destroyed before we've had a chance to get the message,
# the message will get lost...
topic = dds.flexible_msg.create_topic( participant, "blah" )
reader = dds.topic_reader( topic )
reader.on_data_available( on_data_available )
reader.run( dds.topic_reader.qos() )
assert( not data )
remote.run( 'topic.write( """' + sample1_json + '""" )', timeout=5 )
wait_for_data()
if test.check_equal( len(data), 1 ):
if test.check_equal( next( iter( data )), 'blah' ):
test.check_equal( len( data['blah'] ), 1 )
test.check_equal( json.loads( data['blah'][0].json_data() ), sample1 )
except:
test.unexpected_exception()
remote.run( 'topic = None', timeout=5 )
data = []
test.finish()
#
#############################################################################################


test.print_results_and_exit()
5 changes: 1 addition & 4 deletions unit-tests/py/rspy/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,7 @@ def check_equal(result, expected, abort_if_failed = False):
:return: True if assertion passed, False otherwise
"""
if type(expected) == list:
log.out("check_equal should not be used for lists. Use check_equal_lists instead")
if abort_if_failed:
abort()
return False
raise RuntimeError( "check_equal should not be used for lists. Use check_equal_lists instead" )
global n_assertions
n_assertions += 1
if type(expected) != type(result):
Expand Down