Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow saving on CPU usage for infrequent inference requests by reducing thread spinning #11841

Merged
merged 5 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,133 changes: 574 additions & 559 deletions include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions include/onnxruntime/core/platform/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ class ThreadPool {
"Per-thread state should be trivially destructible");
};

// The below API allows to disable spinning
// This is used to support real-time scenarios where
// spinning between relatively infrequent requests
// contributes to high CPU usage while not processing anything.
void EnableSpinning();

void DisableSpinning();

// Schedules fn() for execution in the pool of threads. The function may run
// synchronously if it cannot be enqueued. This will occur if the thread pool's
// degree-of-parallelism is 1, but it may also occur for implementation-dependent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ static const char* const kOrtSessionOptionsConfigNnapiEpPartitioningStopOps = "e
// Available since version 1.11.
static const char* const kOrtSessionOptionsConfigDynamicBlockBase = "session.dynamic_block_base";

// This option allows to decrease CPU usage between infrequent
// requests and forces any TP threads spinning stop immediately when the last of
// concurrent Run() call returns.
// Spinning is restarted on the next Run() call.
// Applies only to internal thread-pools
static const char* const kOrtSessionOptionsConfigForceSpinningStop = "session.force_spinning_stop_between_runs";
yuslepukhin marked this conversation as resolved.
Show resolved Hide resolved

// "1": all inconsistencies encountered during shape and type inference
// will result in failures.
// "0": in some cases warnings will be logged but processing will continue. The default.
Expand Down
12 changes: 12 additions & 0 deletions onnxruntime/core/common/threadpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,18 @@ std::string ThreadPool::StopProfiling(concurrency::ThreadPool* tp) {
}
}

void ThreadPool::EnableSpinning() {
if (extended_eigen_threadpool_) {
extended_eigen_threadpool_->EnableSpinning();
}
}

void ThreadPool::DisableSpinning() {
if (extended_eigen_threadpool_) {
extended_eigen_threadpool_->DisableSpinning();
}
}

// Return the number of threads created by the pool.
int ThreadPool::NumThreads() const {
if (underlying_threadpool_) {
Expand Down
6 changes: 0 additions & 6 deletions onnxruntime/core/providers/cpu/rnn/deep_cpu_gru.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,12 +594,6 @@ void UniDirectionalGru<T>::Compute(const gsl::span<const T>& inputs_arg,
}

{
// Enter a parallel section encompassing the kernels invoked
// below. This lets the runtime system amortize loop entry/exit
// costs over a series of short kernels, and promotes cache
// affinity between iterations of successive loops.
onnxruntime::concurrency::ThreadPool::ParallelSection ps(ttp_);
yuslepukhin marked this conversation as resolved.
Show resolved Hide resolved

// for each item in sequence run all calculations
for (int step = 0; step < max_sequence_length; step++) {
#if defined(DUMP_MATRIXES)
Expand Down
39 changes: 39 additions & 0 deletions onnxruntime/core/session/inference_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ void InferenceSession::ConstructorCommon(const SessionOptions& session_options,
}

use_per_session_threads_ = session_options.use_per_session_threads;
force_spinning_stop_between_runs_ = session_options_.config_options.GetConfigOrDefault(kOrtSessionOptionsConfigForceSpinningStop, "0") == "1";

if (use_per_session_threads_) {
LOGS(*session_logger_, INFO) << "Creating and using per session threadpools since use_per_session_threads_ is true";
Expand Down Expand Up @@ -1930,6 +1931,44 @@ Status InferenceSession::Run(const RunOptions& run_options,
#ifdef DEBUG_NODE_INPUTS_OUTPUTS
session_state_->IncrementGraphExecutionCounter();
#endif

struct ThreadPoolSpinningSwitch {
using PS = onnxruntime::concurrency::ThreadPool::ParallelSection;
concurrency::ThreadPool* intra_tp_;
concurrency::ThreadPool* inter_tp_;
std::atomic_int32_t& counter_ref_;
// Use this to jump start threads and amortize the costs
// of initialization between the kernels
// note this prevents using explicit PS in the nodes
// or an additional PS for inter op thread-pool
std::optional<PS> ps_intra_;
ThreadPoolSpinningSwitch(concurrency::ThreadPool* intra_tp,
concurrency::ThreadPool* inter_tp,
std::atomic_int32_t& ref) noexcept
: intra_tp_(intra_tp), inter_tp_(inter_tp), counter_ref_(ref) {
if (counter_ref_.fetch_add(1, std::memory_order_relaxed) == 0) {
if (intra_tp_) intra_tp_->EnableSpinning();
if (inter_tp_) inter_tp_->EnableSpinning();
}
if (intra_tp_) {
ps_intra_.emplace(intra_tp_);
}
}
~ThreadPoolSpinningSwitch() {
ps_intra_.reset();
if (1 == counter_ref_.fetch_sub(1, std::memory_order_acq_rel)) {
if (intra_tp_) intra_tp_->DisableSpinning();
if (inter_tp_) inter_tp_->DisableSpinning();
}
}
};

std::optional<ThreadPoolSpinningSwitch> tp_starter;
if (force_spinning_stop_between_runs_) {
concurrency::ThreadPool* intra_tp_ = (use_per_session_threads_) ? thread_pool_.get() : intra_op_thread_pool_from_env_;
concurrency::ThreadPool* inter_tp = (use_per_session_threads_) ? inter_op_thread_pool_.get() : inter_op_thread_pool_from_env_;
tp_starter.emplace(intra_tp_, inter_tp, invocation_refcounter_);
}
yuslepukhin marked this conversation as resolved.
Show resolved Hide resolved
ORT_CHECK_AND_SET_RETVAL(utils::ExecuteGraph(*session_state_, feeds_fetches_manager, feeds, *p_fetches,
session_options_.execution_mode, run_options.terminate, run_logger,
run_options.only_execute_path_to_fetches));
Expand Down
11 changes: 11 additions & 0 deletions onnxruntime/core/session/inference_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,17 @@ class InferenceSession {
std::basic_string<ORTCHAR_T> thread_pool_name_;
std::basic_string<ORTCHAR_T> inter_thread_pool_name_;

// This counter increments each time Run() is issued and decrements each time
// Run() exits. For concurrent executions this reference counter will be greater than 1
// This is currently used by thread-pools to find out if there is a Run() call in progress
// so it can adjust it spinning policies.
std::atomic_int32_t invocation_refcounter_{0};
pranavsharma marked this conversation as resolved.
Show resolved Hide resolved
// This option allows to decrease CPU usage between infrequent
// requests and forces any TP threads spinning stop immediately when the last of
// concurrent Run() call returns.
// Spinning is restarted on the next Run()
bool force_spinning_stop_between_runs_ = false;

std::unique_ptr<onnxruntime::concurrency::ThreadPool> thread_pool_;
std::unique_ptr<onnxruntime::concurrency::ThreadPool> inter_op_thread_pool_;

Expand Down