Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow saving on CPU usage for infrequent inference requests by reducing thread spinning #11841

Merged
merged 5 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,146 changes: 581 additions & 565 deletions include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions include/onnxruntime/core/platform/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ class ThreadPool {
"Per-thread state should be trivially destructible");
};

// The below API allows to disable spinning
// This is used to support real-time scenarios where
// spinning between relatively infrequent requests
// contributes to high CPU usage while not processing anything.
void EnableSpinning();

void DisableSpinning();

// Schedules fn() for execution in the pool of threads. The function may run
// synchronously if it cannot be enqueued. This will occur if the thread pool's
// degree-of-parallelism is 1, but it may also occur for implementation-dependent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ static const char* const kOrtSessionOptionsConfigNnapiEpPartitioningStopOps = "e
// Available since version 1.11.
static const char* const kOrtSessionOptionsConfigDynamicBlockBase = "session.dynamic_block_base";

// This option allows to decrease CPU usage between infrequent
// requests and forces any TP threads spinning stop immediately when the last of
// concurrent Run() call returns.
// Spinning is restarted on the next Run() call.
// Applies only to internal thread-pools
static const char* const kOrtSessionOptionsConfigForceSpinningStop = "session.force_spinning_stop";

// "1": all inconsistencies encountered during shape and type inference
// will result in failures.
// "0": in some cases warnings will be logged but processing will continue. The default.
Expand Down
12 changes: 12 additions & 0 deletions onnxruntime/core/common/threadpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,18 @@ std::string ThreadPool::StopProfiling(concurrency::ThreadPool* tp) {
}
}

void ThreadPool::EnableSpinning() {
if (extended_eigen_threadpool_) {
extended_eigen_threadpool_->EnableSpinning();
}
}

void ThreadPool::DisableSpinning() {
if (extended_eigen_threadpool_) {
extended_eigen_threadpool_->DisableSpinning();
}
}

// Return the number of threads created by the pool.
int ThreadPool::NumThreads() const {
if (underlying_threadpool_) {
Expand Down
6 changes: 0 additions & 6 deletions onnxruntime/core/providers/cpu/rnn/deep_cpu_gru.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,12 +594,6 @@ void UniDirectionalGru<T>::Compute(const gsl::span<const T>& inputs_arg,
}

{
// Enter a parallel section encompassing the kernels invoked
// below. This lets the runtime system amortize loop entry/exit
// costs over a series of short kernels, and promotes cache
// affinity between iterations of successive loops.
onnxruntime::concurrency::ThreadPool::ParallelSection ps(ttp_);
yuslepukhin marked this conversation as resolved.
Show resolved Hide resolved

// for each item in sequence run all calculations
for (int step = 0; step < max_sequence_length; step++) {
#if defined(DUMP_MATRIXES)
Expand Down
56 changes: 52 additions & 4 deletions onnxruntime/core/session/inference_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ void InferenceSession::ConstructorCommon(const SessionOptions& session_options,
}

use_per_session_threads_ = session_options.use_per_session_threads;
force_spinning_stop_between_runs_ = session_options_.config_options.GetConfigOrDefault(kOrtSessionOptionsConfigForceSpinningStop, "0") == "1";

if (use_per_session_threads_) {
LOGS(*session_logger_, INFO) << "Creating and using per session threadpools since use_per_session_threads_ is true";
Expand Down Expand Up @@ -1826,6 +1827,45 @@ Status InferenceSession::PartialRun(onnxruntime::RunOptions& run_options,
}
#endif

namespace {
// Concurrent runs counting and thread-pool spin control
struct ThreadPoolSpinningSwitch {
using PS = onnxruntime::concurrency::ThreadPool::ParallelSection;
concurrency::ThreadPool* intra_tp_{nullptr};
concurrency::ThreadPool* inter_tp_{nullptr};
std::atomic<int>& concurrent_num_runs_;
// Use this to jump start threads and amortize the costs
// of initialization between the kernels
// note this prevents using explicit PS in the nodes
// or an additional PS for inter op thread-pool
std::optional<PS> ps_intra_;
// __Ctor ref-counting only
explicit ThreadPoolSpinningSwitch(std::atomic<int>& ref) : concurrent_num_runs_(ref) {
concurrent_num_runs_.fetch_add(1, std::memory_order_relaxed);
}
// __Ctor Refcounting and spinning control
ThreadPoolSpinningSwitch(concurrency::ThreadPool* intra_tp,
concurrency::ThreadPool* inter_tp,
std::atomic<int>& ref) noexcept
: intra_tp_(intra_tp), inter_tp_(inter_tp), concurrent_num_runs_(ref) {
if (concurrent_num_runs_.fetch_add(1, std::memory_order_relaxed) == 0) {
if (intra_tp_) intra_tp_->EnableSpinning();
if (inter_tp_) inter_tp_->EnableSpinning();
}
tlh20 marked this conversation as resolved.
Show resolved Hide resolved
if (intra_tp_) {
ps_intra_.emplace(intra_tp_);
}
}
~ThreadPoolSpinningSwitch() {
ps_intra_.reset();
if (1 == concurrent_num_runs_.fetch_sub(1, std::memory_order_acq_rel)) {
if (intra_tp_) intra_tp_->DisableSpinning();
if (inter_tp_) inter_tp_->DisableSpinning();
}
}
};
} // namespace

Status InferenceSession::Run(const RunOptions& run_options,
const std::vector<std::string>& feed_names, const std::vector<OrtValue>& feeds,
const std::vector<std::string>& output_names, std::vector<OrtValue>* p_fetches,
Expand All @@ -1843,12 +1883,22 @@ Status InferenceSession::Run(const RunOptions& run_options,
Status retval = Status::OK();
const Env& env = Env::Default();

// Increment/decrement concurrent_num_runs_ and control
// threads spinning as configured. Do nothing for graph replay except the counter
std::optional<ThreadPoolSpinningSwitch> tp_starter;
if (force_spinning_stop_between_runs_ && !cached_execution_provider_for_graph_replay_.IsGraphCaptured()) {
concurrency::ThreadPool* intra_tp_ = (use_per_session_threads_) ? thread_pool_.get() : intra_op_thread_pool_from_env_;
concurrency::ThreadPool* inter_tp = (use_per_session_threads_) ? inter_op_thread_pool_.get() : inter_op_thread_pool_from_env_;
tp_starter.emplace(intra_tp_, inter_tp, current_num_runs_);
} else {
tp_starter.emplace(current_num_runs_);
}

// Check if this Run() is simply going to be a CUDA Graph replay.
if (cached_execution_provider_for_graph_replay_.IsGraphCaptured()) {
LOGS(*session_logger_, INFO) << "Replaying the captured "
<< cached_execution_provider_for_graph_replay_.Type()
<< " CUDA Graph for this model with tag: " << run_options.run_tag;
++current_num_runs_;
ORT_RETURN_IF_ERROR_SESSIONID_(cached_execution_provider_for_graph_replay_.ReplayGraph());
} else {
std::vector<IExecutionProvider*> exec_providers_to_stop;
Expand Down Expand Up @@ -1893,8 +1943,6 @@ Status InferenceSession::Run(const RunOptions& run_options,
LOGS(*session_logger_, INFO) << "Running with tag: " << run_options.run_tag;
}

++current_num_runs_;

// scope of owned_run_logger is just the call to Execute.
// If Execute ever becomes async we need a different approach
std::unique_ptr<logging::Logger> owned_run_logger;
Expand Down Expand Up @@ -1930,6 +1978,7 @@ Status InferenceSession::Run(const RunOptions& run_options,
#ifdef DEBUG_NODE_INPUTS_OUTPUTS
session_state_->IncrementGraphExecutionCounter();
#endif

ORT_CHECK_AND_SET_RETVAL(utils::ExecuteGraph(*session_state_, feeds_fetches_manager, feeds, *p_fetches,
session_options_.execution_mode, run_options.terminate, run_logger,
run_options.only_execute_path_to_fetches));
Expand All @@ -1953,7 +2002,6 @@ Status InferenceSession::Run(const RunOptions& run_options,
ShrinkMemoryArenas(arenas_to_shrink);
}
}
--current_num_runs_;

// keep track of telemetry
++telemetry_.total_runs_since_last_;
Expand Down
6 changes: 6 additions & 0 deletions onnxruntime/core/session/inference_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,12 @@ class InferenceSession {
std::basic_string<ORTCHAR_T> thread_pool_name_;
std::basic_string<ORTCHAR_T> inter_thread_pool_name_;

// This option allows to decrease CPU usage between infrequent
// requests and forces any TP threads spinning stop immediately when the last of
// concurrent ExecuteGraph() call returns.
// Spinning is restarted on the next Run()
bool force_spinning_stop_between_runs_ = false;

std::unique_ptr<onnxruntime::concurrency::ThreadPool> thread_pool_;
std::unique_ptr<onnxruntime::concurrency::ThreadPool> inter_op_thread_pool_;

Expand Down