Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatcher exception + high CPU usage fix #9258

Merged
merged 9 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions common/utilities/time/waiting-on.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class waiting_on
{
T _value;
std::condition_variable _cv;
std::atomic_bool _valid{ true };

friend class waiting_on;

Expand Down Expand Up @@ -58,9 +59,14 @@ class waiting_on
{
_cv.notify_one();
}
void signal_all()
// Invalidate the wait_state_t so the user will not use destroyed objects
void invalidate()
{
_cv.notify_all();
if ( _valid )
{
_valid = false;
_cv.notify_all();
}
}
};
private:
Expand All @@ -76,23 +82,24 @@ class waiting_on
public:
class in_thread_
{
std::weak_ptr< class wait_state_t > const _ptr;
std::weak_ptr< wait_state_t > const _ptr;
// We use an invalidator for invalidating the class when reference count is equal to Zero.
std::shared_ptr< std::nullptr_t > const _invalidator;

public:
in_thread_( waiting_on const& local )
in_thread_( waiting_on const & local )
: _ptr( local._ptr )
, _invalidator(
nullptr,
[weak_ptr = std::weak_ptr< wait_state_t >( local._ptr )]( std::nullptr_t * ) {
maloel marked this conversation as resolved.
Show resolved Hide resolved
// We get here when the lambda we're in is destroyed -- so either we've
// already run (and signalled once) or we've never run. We signal anyway
// if anything's waiting they'll get woken up; otherwise nothing'll happen...
if( auto wait_state = weak_ptr.lock() )
wait_state->invalidate();
} )
{
}
#if 0 // TODO this causes major slowdowns! left in here for Eran to break his head against...
~in_thread_()
{
// We get here when the lambda we're in is destroyed -- so either we've already run
// (and signalled once) or we've never run. We signal anyway -- if anything's waiting
// they'll get woken up; otherwise nothing'll happen...
if( auto wait_state = still_alive() )
wait_state->signal_all();
}
#endif

std::shared_ptr< wait_state_t > still_alive() const { return _ptr.lock(); }

Expand Down Expand Up @@ -143,7 +150,11 @@ class waiting_on
// Following will issue (from CppCheck):
// warning: The lock is ineffective because the mutex is locked at the same scope as the mutex itself. [localMutex]
std::unique_lock< std::mutex > locker( m );
_ptr->_cv.wait_for( locker, timeout, pred );
_ptr->_cv.wait_for( locker, timeout, [&]() -> bool {
if( ! _ptr->_valid )
return true;
return pred();
} );
}
};

Expand Down
7 changes: 7 additions & 0 deletions src/concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,15 @@ class dispatcher
//
bool flush();


private:
// Return true if dispatcher is started (within a timeout).
// false if not or the dispatcher is no longer alive
//
bool _wait_for_start( int timeout_ms );

friend cancellable_timer;

single_consumer_queue<std::function<void(cancellable_timer)>> _queue;
std::thread _thread;

Expand Down
62 changes: 41 additions & 21 deletions src/dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@ dispatcher::dispatcher( unsigned int cap, std::function< void( action ) > on_dro
int timeout_ms = 5000;
while( _is_alive )
{
std::function< void( cancellable_timer ) > item;

if( _queue.dequeue( &item, timeout_ms ) )
if( _wait_for_start( timeout_ms ) )
{
cancellable_timer time(this);

try
{
// While we're dispatching the item, we cannot stop!
std::lock_guard< std::mutex > lock( _dispatch_mutex );
item( time );
}
catch( const std::exception & e )
std::function< void(cancellable_timer) > item;
if (_queue.dequeue(&item, timeout_ms))
{
LOG_ERROR( "Dispatcher [" << this << "] exception caught: " << e.what() );
}
catch( ... )
{
LOG_ERROR( "Dispatcher [" << this << "] unknown exception caught!" );
cancellable_timer time(this);

try
{
// While we're dispatching the item, we cannot stop!
std::lock_guard< std::mutex > lock(_dispatch_mutex);
item(time);
}
catch (const std::exception& e)
{
LOG_ERROR("Dispatcher [" << this << "] exception caught: " << e.what());
}
catch (...)
{
LOG_ERROR("Dispatcher [" << this << "] unknown exception caught!");
}
}
}
}
Expand All @@ -59,10 +61,14 @@ dispatcher::~dispatcher()

void dispatcher::start()
{
std::lock_guard< std::mutex > lock( _was_stopped_mutex );
_was_stopped = false;

{
std::lock_guard< std::mutex > lock(_was_stopped_mutex);
_was_stopped = false;
}
_queue.start();
// Wake up all threads that wait for the dispatcher to start
_was_stopped_cv.notify_all();
maloel marked this conversation as resolved.
Show resolved Hide resolved

}


Expand All @@ -84,8 +90,8 @@ void dispatcher::stop()
{
std::lock_guard< std::mutex > lock( _was_stopped_mutex );
_was_stopped = true;
_was_stopped_cv.notify_all();
}
_was_stopped_cv.notify_all();
maloel marked this conversation as resolved.
Show resolved Hide resolved

// Wait until any dispatched is done...
{
Expand Down Expand Up @@ -113,3 +119,17 @@ bool dispatcher::flush()
} );
return invoked;
}

// Return true if dispatcher is started (within a timeout).
// false if not or the dispatcher is no longer alive
//
bool dispatcher::_wait_for_start( int timeout_ms )
{
// If the dispatcher is not started wait for a start event, if not such event within given timeout do nothing.
// If during the wait the thread destructor is called (_is_aliva = false) do nothing as well.
std::unique_lock< std::mutex > lock(_was_stopped_mutex);
return _was_stopped_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() {
return !_was_stopped.load() || !_is_alive;
} ) && _is_alive;
}

4 changes: 3 additions & 1 deletion src/media/playback/playback_sensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void playback_sensor::open(const stream_profiles& requests)
//Playback can only play the streams that were recorded.
//Go over the requested profiles and see if they are available
LOG_DEBUG("Open Sensor " << m_sensor_id);

std::lock_guard<std::mutex> l(m_mutex);
for (auto&& r : requests)
{
if (std::find_if(std::begin(m_available_profiles),
Expand All @@ -94,6 +94,7 @@ void playback_sensor::open(const stream_profiles& requests)
std::make_shared< dispatcher >( _default_queue_size, on_drop_callback ) ) );

m_dispatchers[profile->get_unique_id()]->start();

device_serializer::stream_identifier f{ get_device_index(), m_sensor_id, profile->get_stream_type(), static_cast<uint32_t>(profile->get_stream_index()) };
opened_streams.push_back(f);
}
Expand All @@ -104,6 +105,7 @@ void playback_sensor::open(const stream_profiles& requests)
void playback_sensor::close()
{
LOG_DEBUG("Close sensor " << m_sensor_id);
std::lock_guard<std::mutex> l(m_mutex);
std::vector<device_serializer::stream_identifier> closed_streams;
for (auto&& dispatcher : m_dispatchers)
{
Expand Down
91 changes: 91 additions & 0 deletions unit-tests/utilities/concurrency/test-dispatcher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2021 Intel Corporation. All Rights Reserved.

//#cmake:add-file ../../../src/dispatcher.cpp

#include <unit-tests/test.h>
#include <common/utilities/time/timer.h>
#include <src/concurrency.h>

#include <algorithm>
#include <vector>

using namespace utilities::time;

// We use this function as a CPU stress test function
int fibo( int num )
maloel marked this conversation as resolved.
Show resolved Hide resolved
{
if( num < 2 )
return 1;
return fibo( num - 1 ) + fibo( num - 2 );
}

TEST_CASE( "dispatcher main flow" )
{
dispatcher d(2);
std::atomic_bool run = { false };
auto func = [&](dispatcher::cancellable_timer c)
{
run = true;
};

d.start();
REQUIRE(d.empty());
d.invoke(func);
REQUIRE_FALSE(d.empty());
REQUIRE(d.flush());
REQUIRE(run);
d.stop();
}

TEST_CASE( "invoke and wait" )
{
dispatcher d(2);

std::atomic_bool run = { false };
auto func = [&](dispatcher::cancellable_timer c)
{
std::this_thread::sleep_for(std::chrono::seconds(3));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should really be using the cancellable_timer... and then we can test that, too... something to think about :)

run = true;
};

d.start();
stopwatch sw;
d.invoke_and_wait(func, []() {return false; }, true);
REQUIRE(sw.get_elapsed_ms() > 3000); // verify we get here only after the function call ended
d.stop();
}

TEST_CASE("verify stop() not consuming high CPU usage")
{
// using shared_ptr because no copy constructor is allowed for a dispatcher.
std::vector<std::shared_ptr<dispatcher>> dispatchers;

for (int i = 0 ; i < 32; ++i)
{
dispatchers.push_back(std::make_shared<dispatcher>(10));
}

for (auto &&dispatcher : dispatchers)
{
dispatcher->start();
}

for (auto&& dispatcher : dispatchers)
{
dispatcher->stop();
}


// Allow some time for all threads to do some work
std::this_thread::sleep_for(std::chrono::seconds(5));

stopwatch sw;

// Do some stress work
REQUIRE(fibo(40) == 165580141);
// Verify the stress test did not take too long.
// We had an issue that stop() call cause a high CPU usage and therefore other operations stall,
// This test took > 9 seconds on an 8 cores PC, after the fix it took ~1.5 sec on 1 core run.
REQUIRE(sw.get_elapsed_ms() < 5000);
}
Loading