Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix no composite frame on playback (from Nir) #9210

Merged
merged 10 commits into from
Jun 13, 2021
1 change: 1 addition & 0 deletions common/utilities/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ target_sources(${LRS_TARGET}
"${CMAKE_CURRENT_LIST_DIR}/time/stopwatch.h"
"${CMAKE_CURRENT_LIST_DIR}/time/timer.h"
"${CMAKE_CURRENT_LIST_DIR}/time/periodic_timer.h"
"${CMAKE_CURRENT_LIST_DIR}/time/waiting-on.h"
"${CMAKE_CURRENT_LIST_DIR}/time/work_week.h"
"${CMAKE_CURRENT_LIST_DIR}/time/work_week.cpp"
"${CMAKE_CURRENT_LIST_DIR}/time/l500/get-mfr-ww.h"
Expand Down
134 changes: 134 additions & 0 deletions common/utilities/time/waiting-on.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2021 Intel Corporation. All Rights Reserved.

#pragma once

#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>


namespace utilities {
namespace time {

// Helper class -- encapsulate a variable of type T that we want to wait on: another thread will set
// it and signal when we can continue...
//
template< class T >
class waiting_on
Comment on lines +22 to +23
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class seem to mimic std::future, why is it needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we're not using std::future... and the whole mechanism of the dispatcher is written without it.
Perhaps we can refactor this class to use it later, but for now... no.

{
// We need to be careful with timeouts: if we time out then the local waiting_on can go out of
// scope and then the thread cannot set anything or signal anyone! We get around this by using a
// shared_ptr & weak_ptr to manage access:
//
public:
class wait_state_t
{
T _value;
std::condition_variable _cv;

friend class waiting_on;

public:
wait_state_t() = default; // allow default ctor
wait_state_t( T const & t )
: _value( t )
{
}

operator T &() { return _value; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to enable to change the value without signal?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, the set-and-signal function is just a convenience function

operator T const &() const { return _value; }

T* operator->() { return &_value; }
T const* operator->() const { return &_value; }

// Set a new value and signal
void signal( T const & t )
{
_value = t;
signal();
}
// Signal with the current value
void signal()
{
_cv.notify_one();
}
};
private:
std::shared_ptr< wait_state_t > _ptr;

// When we declare the signalling lambda for the other thread, we need to pass it the weak_ptr.
// This class wraps it up nicely, so you can write:
// waiting_on< bool > invoked( false )
// auto thread_function = [invoked = invoked.in_thread()]() {
// invoked.signal( true );
// }
//
public:
class in_thread_
{
std::weak_ptr< class wait_state_t > const _ptr;

public:
in_thread_( waiting_on const& local )
: _ptr( local._ptr )
{
}

std::shared_ptr< wait_state_t > still_alive() const { return _ptr.lock(); }

// Wake up the local function (which is using wait_until(), presumable) with a new
// T value
void signal( T const& t ) const
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider to change it to safe_signal() , I think its clear what is the added value of this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing makes it safe... it's just a convenience

{
if( auto wait_state = still_alive() )
wait_state->signal( t );
Comment on lines +103 to +104
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The operation needs to be atomic to avoid race cond.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole class is meant to be used in a master-slave kind of relationship: one thread running one function (and not multiple threads running one function). You're waiting on something to happen with a timeout. That's all.

The use-case of multiple threads all trying to signal the same waiting object is still possible -- this class does not decide for the user what the synchronization mechanism is. The user writes the lambda, which can worry about synchronization if it needs it. I don't think it's the job of the class.

I'll add the above to the class comment.

}
};

public:
waiting_on()
: _ptr( std::make_shared< wait_state_t >() )
{
}
waiting_on( T const & value )
: _ptr( std::make_shared< wait_state_t >( value ) )
{
}

// Convert to the in-thread representation
in_thread_ in_thread() const { return in_thread_( *this ); }

operator T const &() const { return _ptr->_value; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't use operator T &() of wait_state_t?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to *_ptr instead of _ptr->_value


// struct value_t { double x; int k; };
// waiting_on< value_t > output({ 1., -1 });
// output->x = 2.;
T * operator->() { return &_ptr->_value; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No easy syntax that can be used here

T const * operator->() const { return &_ptr->_value; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same


// Wait until either the timeout occurs, or the predicate evaluates to true.
// Equivalent to:
// while( ! pred() )
// {
// wait( timeout );
// if( timed-out )
// break;
// }
template < class U, class L >
void wait_until( U const& timeout, L const& pred )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better if wait_until will return bool - true = pred , false =timeout

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer with no return value to make the code more readable

{
// Note that the mutex is useless and used only for the wait -- we assume here that access
// to the T data does not need mutual exclusion
std::mutex m;
// Following will issue (from CppCheck):
// warning: The lock is ineffective because the mutex is locked at the same scope as the mutex itself. [localMutex]
std::unique_lock< std::mutex > locker( m );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to lock the mutex on signal() too

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's up to the user to write the lambda with concurrency in mind, if that's what's needed (for example, in dispatcher.cpp it's not needed)

_ptr->_cv.wait_for( locker, timeout, pred );
}
};


} // namespace time
} // namespace utilities
34 changes: 21 additions & 13 deletions src/core/streaming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,31 @@ namespace librealsense
std::string frame_to_string(const frame_interface & f)
{
std::ostringstream s;
auto composite = dynamic_cast<const composite_frame *>(&f);
if (composite)

if (!&f)
{
s << "[";
for (int i = 0; i < composite->get_embedded_frames_count(); i++)
{
s << *composite->get_frame(i);
}
s << "]";
s << "[null]";
}
else
{
s << "[" << f.get_stream()->get_stream_type();
s << "/" << f.get_stream()->get_unique_id();
s << " #" << f.get_frame_number();
s << " @" << std::fixed << (double)f.get_frame_timestamp();
s << "]";
auto composite = dynamic_cast<const composite_frame*>(&f);
if (composite)
{
s << "[";
for (int i = 0; i < composite->get_embedded_frames_count(); i++)
{
s << *composite->get_frame(i);
}
s << "]";
}
else
{
s << "[" << f.get_stream()->get_stream_type();
s << "/" << f.get_stream()->get_unique_id();
s << " #" << f.get_frame_number();
s << " @" << std::fixed << (double)f.get_frame_timestamp();
s << "]";
}
}
return s.str();
}
Expand Down
40 changes: 20 additions & 20 deletions src/dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Copyright(c) 2021 Intel Corporation. All Rights Reserved.

#include "concurrency.h"
#include "types.h"
#include "../common/utilities/time/waiting-on.h"


dispatcher::dispatcher( unsigned int cap, std::function< void( action ) > on_drop_callback )
Expand All @@ -27,8 +29,13 @@ dispatcher::dispatcher( unsigned int cap, std::function< void( action ) > on_dro
std::lock_guard< std::mutex > lock( _dispatch_mutex );
item( time );
}
catch( const std::exception & e )
{
LOG_ERROR( "Dispatcher [" << this << "] exception caught: " << e.what() );
}
catch( ... )
{
LOG_ERROR( "Dispatcher [" << this << "] unknown exception caught!" );
}
}
}
Expand Down Expand Up @@ -88,28 +95,21 @@ void dispatcher::stop()
}


// Return when all items in the queue are finished (within a timeout).
// Return when all current items in the queue are finished (within a timeout).
// If additional items are added while we're waiting, those will not be waited on!
// Returns false if a timeout occurred before we were done
//
bool dispatcher::flush()
{
std::mutex m;
std::condition_variable cv;
bool invoked = false;
auto wait_sucess = std::make_shared<std::atomic_bool>(true);
invoke([&, wait_sucess](cancellable_timer t)
{
///TODO: use _queue to flush, and implement properly
if (_was_stopped || !(*wait_sucess))
return;

{
std::lock_guard<std::mutex> locker(m);
invoked = true;
}
cv.notify_one();
});
std::unique_lock<std::mutex> locker(m);
*wait_sucess = cv.wait_for(locker, std::chrono::seconds(10), [&]() { return invoked || _was_stopped; });
return *wait_sucess;
if( _was_stopped )
return true; // Nothing to do - so success (no timeout)

utilities::time::waiting_on< bool > invoked( false );
invoke( [invoked = invoked.in_thread()]( cancellable_timer ) {
invoked.signal( true );
} );
invoked.wait_until( std::chrono::seconds( 10 ), [&]() {
return invoked || _was_stopped;
maloel marked this conversation as resolved.
Show resolved Hide resolved
} );
return invoked;
}
Loading