diff --git a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h index 90c4329202db..82573cea5a94 100644 --- a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h +++ b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h @@ -132,7 +132,7 @@ // // - When entering a parallel loop (or parallel section), a thread // maintains a set of "preferred" worker hints, and initially -// submits tasks to these workers. +// submits tasks to these workers. // When a task executes, it updates the submitting thread's // preferred workers to reflect the worker that the task ran on. // Hence, if a task is submitted to thread T1's queue, and then @@ -265,14 +265,14 @@ class ThreadPoolProfiler { std::string Reset(); }; bool enabled_ = false; - MainThreadStat& GetMainThreadStat(); //return thread local stat + MainThreadStat& GetMainThreadStat(); //return thread local stat int num_threads_; struct ChildThreadStat { std::thread::id thread_id_; uint64_t num_run_ = 0; onnxruntime::TimePoint last_logged_point_ = Clock::now(); - int32_t core_ = -1; //core that the child thread is running on - PaddingToAvoidFalseSharing padding_; //to prevent false sharing + int32_t core_ = -1; //core that the child thread is running on + PaddingToAvoidFalseSharing padding_; //to prevent false sharing }; std::vector child_thread_stats_; std::string thread_pool_name_; @@ -288,9 +288,9 @@ class ExtendedThreadPoolInterface : public Eigen::ThreadPoolInterface { // Start/end a parallel section, within which calls to // RunInParallelSection may be made. Parallel sections are // non-nesting. - virtual std::unique_ptr AllocateParallelSection() = 0; - virtual void StartParallelSection(ThreadPoolParallelSection &ps) = 0; - virtual void EndParallelSection(ThreadPoolParallelSection &ps) = 0; + virtual std::unique_ptr AllocateParallelSection() = 0; + virtual void StartParallelSection(ThreadPoolParallelSection& ps) = 0; + virtual void EndParallelSection(ThreadPoolParallelSection& ps) = 0; // Run fn with up to n degree-of-parallelism enlisting the thread // pool for help. The degree-of-parallelism includes the caller, @@ -302,7 +302,7 @@ class ExtendedThreadPoolInterface : public Eigen::ThreadPoolInterface { // // The parameter idx provides a loop-local thread ID in the range // [0,k) where k<=n. - virtual void RunInParallelSection(ThreadPoolParallelSection &ps, + virtual void RunInParallelSection(ThreadPoolParallelSection& ps, std::function fn, unsigned n, std::ptrdiff_t block_size) = 0; @@ -322,11 +322,10 @@ class ExtendedThreadPoolInterface : public Eigen::ThreadPoolInterface { // two loops execute in series in a parallel section. ] virtual void RunInParallel(std::function fn, unsigned n, std::ptrdiff_t block_size) = 0; - virtual void StartProfiling() = 0; + virtual void StartProfiling() = 0; virtual std::string StopProfiling() = 0; }; - class ThreadPoolParallelSection { public: // State accessed only by the main thread @@ -334,7 +333,7 @@ class ThreadPoolParallelSection { // Tasks successfully submitted to the work queues. This sets the // maximum degree of parallelism that the section will support. - std::vector> tasks; + std::vector> tasks; // Number of tasks revoked (i.e., removed from the queues prior to // execution). We count this at various points, and omit waiting @@ -368,11 +367,11 @@ class ThreadPoolParallelSection { // // - Writers wishing to deallocate *current_loop must first clear // current_loop and then wait for workers_in_loop==0 - std::atomic current_loop{nullptr}; + std::atomic current_loop{nullptr}; std::atomic workers_in_loop{0}; // Members to track asynchronous dispatching - int dispatch_q_idx = -1; // index of thread that dispatch work to all other threads + int dispatch_q_idx = -1; // index of thread that dispatch work to all other threads unsigned dispatch_w_idx = 0; // index of enqueued work std::atomic dispatch_started{false}; std::atomic dispatch_done{false}; @@ -381,8 +380,8 @@ class ThreadPoolParallelSection { class ThreadPoolLoop { public: - ThreadPoolLoop(std::function f, unsigned t) : fn(std::move(f)), threads_needed(t) { - } + ThreadPoolLoop(std::function f, unsigned t) : fn(std::move(f)), threads_needed(t) { + } const std::function fn; const unsigned threads_needed; @@ -410,7 +409,7 @@ class RunQueue { // If the queue was empty returns default-constructed Work. Work PopFront() { unsigned front; - Elem *e; + Elem* e; ElemState s; // Drain revoked items from the front of the queue. CAS to busy to synchronize with @@ -458,20 +457,20 @@ class RunQueue { return Work(); } - // PushBackWithTag adds w at the end of the queue. The tag value can be used on a + // PushBackWithTag adds w at the end of the queue. The tag value can be used on a // subsequent call to RevokeWithTag to remove the item from the queue in combination // with w_idx. Typically the tag will be a per-thread ID to distinguish work // submitted from different threads. - PushResult PushBackWithTag(Work w, Tag tag, unsigned &w_idx) { + PushResult PushBackWithTag(Work w, Tag tag, unsigned& w_idx) { std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); - w_idx = (back-1) & kMask; + w_idx = (back - 1) & kMask; Elem& e = array_[w_idx]; ElemState s = e.state.load(std::memory_order_relaxed); if (s != ElemState::kEmpty || !e.state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire)) return PushResult::REJECTED; /* Not enqueued */ - bool was_ready = (((back^(front_.load(std::memory_order_relaxed)))&kMask) == 0); + bool was_ready = (((back ^ (front_.load(std::memory_order_relaxed))) & kMask) == 0); back = ((back - 1) & kMask2) | (back & ~kMask2); back_.store(back, std::memory_order_relaxed); e.w = std::move(w); @@ -486,7 +485,7 @@ class RunQueue { return Work(); std::unique_lock lock(mutex_); unsigned back; - Elem *e; + Elem* e; ElemState s; // Drain revoked items from the back of the queue. CAS to busy to synchronize with @@ -513,10 +512,10 @@ class RunQueue { } // RevokeItem removes a work item from the queue. Items are identified positionally, - // and so a tag is used to detect whether the same position is occupied by a + // and so a tag is used to detect whether the same position is occupied by a // different work item at the time of removal. RevokeWithTags lets threads offer work - // for parallel execution, and then revoke the offer prior to the work executing (for - // instance if the thread itself completes all of the work). Revoking the work + // for parallel execution, and then revoke the offer prior to the work executing (for + // instance if the thread itself completes all of the work). Revoking the work // lets the thread deallocate state that might otherwise have been captured by the work item // and accessed by it. // @@ -631,12 +630,12 @@ class RunQueue { if (NeedSizeEstimate) { return CalculateSize(front, back); } - // This value will be 0 if the queue is empty, and undefined otherwise. - unsigned maybe_zero = ((front ^ back) & kMask2); - // Queue size estimate must agree with maybe zero check on the queue - // empty/non-empty state. - eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0)); - return maybe_zero; + // This value will be 0 if the queue is empty, and undefined otherwise. + unsigned maybe_zero = ((front ^ back) & kMask2); + // Queue size estimate must agree with maybe zero check on the queue + // empty/non-empty state. + eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0)); + return maybe_zero; } } @@ -663,7 +662,6 @@ static std::atomic next_tag{1}; template class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInterface { - private: struct PerThread; @@ -677,7 +675,6 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter ThreadPoolProfiler profiler_; public: - void StartProfiling() override { profiler_.Start(); } @@ -772,7 +769,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter void Schedule(std::function fn) override { PerThread* pt = GetPerThread(); int q_idx = Rand(&pt->rand) % num_threads_; - WorkerData &td = worker_data_[q_idx]; + WorkerData& td = worker_data_[q_idx]; Queue& q = td.queue; fn = q.PushBack(std::move(fn)); if (!fn) { @@ -784,511 +781,517 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter } } -//...................................................................... -// -// Parallel sections -// ----------------- -// -// Allocate a new ThreadPoolParallelSection, owned by the returned -// unique_ptr. The explicit deleter avoids the Eigen-specific -// definition of ThreadPoolParallelSection needing to be avilable in -// threadpool.h where the user-facing parallel section API is defined. -GSL_SUPPRESS(r.11) -std::unique_ptr AllocateParallelSection() override { - return std::unique_ptr - (new ThreadPoolParallelSection, - [](ThreadPoolParallelSection *tps) { - delete tps; - }); -} - -// Start a parallel section, using a caller-provided -// ThreadPoolParallelSection for maintaining the per-section state. -// Starting a parallel section is just book-keeping; threads are -// "summoned" to help with the parallel section once it enters -// parallel loops. The threads are then retained until the end of the -// section, being re-used over subsequent loops. - -void StartParallelSectionInternal(PerThread &pt, - ThreadPoolParallelSection &ps) { - assert((!pt.leading_par_section) && "Nested parallelism not supported"); - assert((!ps.active) && "Starting parallel section, but active already"); - pt.leading_par_section = true; - if (!pt.tag.Get()) { - pt.tag = Tag::GetNext(); + //...................................................................... + // + // Parallel sections + // ----------------- + // + // Allocate a new ThreadPoolParallelSection, owned by the returned + // unique_ptr. The explicit deleter avoids the Eigen-specific + // definition of ThreadPoolParallelSection needing to be avilable in + // threadpool.h where the user-facing parallel section API is defined. + GSL_SUPPRESS(r .11) + std::unique_ptr AllocateParallelSection() override { + return std::unique_ptr(new ThreadPoolParallelSection, + [](ThreadPoolParallelSection* tps) { + delete tps; + }); } - ps.dispatch_q_idx = -1; - ps.dispatch_started = false; - ps.dispatch_done = false; - ps.work_done = false; - ps.tasks_revoked = 0; - ps.current_dop = 1; - ps.active = true; -} - -void StartParallelSection(ThreadPoolParallelSection &ps) override { - PerThread* pt = GetPerThread(); - StartParallelSectionInternal(*pt, ps); -} - -// End a parallel section, waiting for all worker threads to exit from -// section. Hence, on return, the ThreadPoolParallelSection object -// can be dealloacted. -void EndParallelSectionInternal(PerThread &pt, - ThreadPoolParallelSection &ps) { - assert((pt.leading_par_section) && "Ending parallel section, but none started"); - assert((ps.active) && "Ending parallel section, but not active"); - pt.leading_par_section = false; - - // Notify workers to exit from the section - ps.active = false; - - // First, attempt to revoke the dispatch task. If we succeed then - // we know we revoked _something_ pushed for the current loop. That - // may be the dispatch task itself, or it may be a task pushed by - // the dispatch task. Those cases are distinguished by whether or - // not the dispatch task itself has started -- if it has not started - // then it cannot have pushed tasks. - if (ps.dispatch_q_idx != -1) { - Queue& q = worker_data_[ps.dispatch_q_idx].queue; - if (q.RevokeWithTag(pt.tag, ps.dispatch_w_idx)) { - if (!ps.dispatch_started.load(std::memory_order_acquire)) { - // We successfully revoked a task, and saw the dispatch task - // not started. Hence we know we revoked the dispatch task. - // This should be the common case. - ps.dispatch_q_idx = -1; - } else { - // We successfully revoked a task, but saw the dispatch task - // had started. Hence we know we revoked one of the _new_ - // tasks created by the dispatcher (not the dispatcher - // itself). This should be the rare case, but can occur if - // one of the tasks created by the dispatcher occupies the - // exact same slot in a work queue that the dispatcher used. - ps.tasks_revoked ++; - } + + // Start a parallel section, using a caller-provided + // ThreadPoolParallelSection for maintaining the per-section state. + // Starting a parallel section is just book-keeping; threads are + // "summoned" to help with the parallel section once it enters + // parallel loops. The threads are then retained until the end of the + // section, being re-used over subsequent loops. + + void StartParallelSectionInternal(PerThread& pt, + ThreadPoolParallelSection& ps) { + assert((!pt.leading_par_section) && "Nested parallelism not supported"); + assert((!ps.active) && "Starting parallel section, but active already"); + pt.leading_par_section = true; + if (!pt.tag.Get()) { + pt.tag = Tag::GetNext(); } + ps.dispatch_q_idx = -1; + ps.dispatch_started = false; + ps.dispatch_done = false; + ps.work_done = false; + ps.tasks_revoked = 0; + ps.current_dop = 1; + ps.active = true; } - // Second, if we failed to revoke the dispatch task, wait for it to - // finish dispatch work. This avoids new tasks being started - // concurrently with us attempting to end the parallel section. - if (ps.dispatch_q_idx != -1) { - while (!ps.dispatch_done.load(std::memory_order_acquire)) { - onnxruntime::concurrency::SpinPause(); - } + void StartParallelSection(ThreadPoolParallelSection& ps) override { + PerThread* pt = GetPerThread(); + StartParallelSectionInternal(*pt, ps); } - // Now we know that dispatch is finshed, we synchronize with the - // tasks that were created (if any) for the parallel section. We - // revoke tasks still in queues, and then wait for any that are - // still running. - profiler_.LogStart(); - unsigned tasks_started = static_cast(ps.tasks.size()); - while (!ps.tasks.empty()) { - const auto& item = ps.tasks.back(); - Queue& q = worker_data_[item.first].queue; - if (q.RevokeWithTag(pt.tag, item.second)) { - ps.tasks_revoked++; + // End a parallel section, waiting for all worker threads to exit from + // section. Hence, on return, the ThreadPoolParallelSection object + // can be dealloacted. + void EndParallelSectionInternal(PerThread& pt, + ThreadPoolParallelSection& ps) { + assert((pt.leading_par_section) && "Ending parallel section, but none started"); + assert((ps.active) && "Ending parallel section, but not active"); + pt.leading_par_section = false; + + // Notify workers to exit from the section + ps.active = false; + + // First, attempt to revoke the dispatch task. If we succeed then + // we know we revoked _something_ pushed for the current loop. That + // may be the dispatch task itself, or it may be a task pushed by + // the dispatch task. Those cases are distinguished by whether or + // not the dispatch task itself has started -- if it has not started + // then it cannot have pushed tasks. + if (ps.dispatch_q_idx != -1) { + Queue& q = worker_data_[ps.dispatch_q_idx].queue; + if (q.RevokeWithTag(pt.tag, ps.dispatch_w_idx)) { + if (!ps.dispatch_started.load(std::memory_order_acquire)) { + // We successfully revoked a task, and saw the dispatch task + // not started. Hence we know we revoked the dispatch task. + // This should be the common case. + ps.dispatch_q_idx = -1; + } else { + // We successfully revoked a task, but saw the dispatch task + // had started. Hence we know we revoked one of the _new_ + // tasks created by the dispatcher (not the dispatcher + // itself). This should be the rare case, but can occur if + // one of the tasks created by the dispatcher occupies the + // exact same slot in a work queue that the dispatcher used. + ps.tasks_revoked++; + } + } } - ps.tasks.pop_back(); - } - profiler_.LogEnd(ThreadPoolProfiler::WAIT_REVOKE); - // Wait for the dispatch task's own work... - if (ps.dispatch_q_idx > -1) { - while (!ps.work_done.load(std::memory_order_acquire)) { + // Second, if we failed to revoke the dispatch task, wait for it to + // finish dispatch work. This avoids new tasks being started + // concurrently with us attempting to end the parallel section. + if (ps.dispatch_q_idx != -1) { + while (!ps.dispatch_done.load(std::memory_order_acquire)) { + onnxruntime::concurrency::SpinPause(); + } + } + + // Now we know that dispatch is finshed, we synchronize with the + // tasks that were created (if any) for the parallel section. We + // revoke tasks still in queues, and then wait for any that are + // still running. + profiler_.LogStart(); + unsigned tasks_started = static_cast(ps.tasks.size()); + while (!ps.tasks.empty()) { + const auto& item = ps.tasks.back(); + Queue& q = worker_data_[item.first].queue; + if (q.RevokeWithTag(pt.tag, item.second)) { + ps.tasks_revoked++; + } + ps.tasks.pop_back(); + } + profiler_.LogEnd(ThreadPoolProfiler::WAIT_REVOKE); + + // Wait for the dispatch task's own work... + if (ps.dispatch_q_idx > -1) { + while (!ps.work_done.load(std::memory_order_acquire)) { + onnxruntime::concurrency::SpinPause(); + } + } + + // ...and wait for any other tasks not revoked to finish their work + auto tasks_to_wait_for = tasks_started - ps.tasks_revoked; + while (ps.tasks_finished < tasks_to_wait_for) { onnxruntime::concurrency::SpinPause(); } + + // Clear status to allow the ThreadPoolParallelSection to be + // re-used. + ps.tasks_finished = 0; } - // ...and wait for any other tasks not revoked to finish their work - auto tasks_to_wait_for = tasks_started - ps.tasks_revoked; - while (ps.tasks_finished < tasks_to_wait_for) { - onnxruntime::concurrency::SpinPause(); + void EndParallelSection(ThreadPoolParallelSection& ps) override { + PerThread* pt = GetPerThread(); + EndParallelSectionInternal(*pt, ps); } - - // Clear status to allow the ThreadPoolParallelSection to be - // re-used. - ps.tasks_finished = 0; -} - -void EndParallelSection(ThreadPoolParallelSection &ps) override { - PerThread* pt = GetPerThread(); - EndParallelSectionInternal(*pt, ps); -} - -//---------------------------------------------------------------------- -// -// Preferred workers -// ----------------- -// -// Initialize the set of hints for preferred worker threads we will -// use. We do this once, covering the maximum num_threads_ items, -// in order to avoid resizing preferred_workers concurrent with -// access from worker threads. -// -// For simplicity we initialize with hints round-robin among the -// workers. For simple workloads with 1 main thread this means we -// will distribute work across the pool of workers. For workers -// with multiple main threads it attempts to balance the load. -// -// These hints are just used as a starting point, and are updated by -// the worker thread that actually claims an item (e.g., if an item -// initially assigned to thread T1 is stolen and executed by T2, -// then T2 is assigned at the new preferred worker). -// -// Note that the hints are held in the _main_ thread that submits -// work to the pool. We assume that a thread is primarily -// submitting work to just one pool, but allow for the pool to -// change over time. Hence we allow the hints vector to grow over -// time. -// -// A note on terminology used in the variable names here: -// -// dop - degree of parallelism, as seen by the user. For instance -// dop=4 means 4 threads in total: 1 main thread that enters the -// loop, plus 1 dispatcher thread, plus 2 additional worker -// threads. -// -// par_idx - a thread's index within the loop, in the range [0,dop). -// -// num_threads_ - the number of worker threads in the thread pool. A -// loop with dop=4 will be common on a pool with 3 threads -// (given that the main thread will also participate). -// -// q_idx - a worker queue index, in the range [0,num_threads_). -// -// preferred_workers - this maps from par_idx values to q_idx. Hence, -// with dop=4 the vector will have length 4, and will identify -// which of the workers (0,1,2) should run tasks for the loop. -// Note that mapping from par_idx values means that only slots -// [1,dop) are actually used in preferred_workers. -// -// Here are three examples, all assuming a machine with 4 h/w threads, -// and ORT configured to use dop=4. -// -// * First, suppose that a single job is running a series of loops. -// Its main thread enters a parallel loop. Initially, let's assume -// its preferred worker array is [_,0,1,2], writing "_" for the -// unusued element for the par_idx=0 work that the main thread will -// run. -// -// The main thread schedules the dispatcher task onto worker 0. -// -// The dispatcher task schedules worker tasks onto workers 1 and 2. -// -// The tasks all execute, without any work stealing, on the threads -// they were scheduled on. The preferred worker array remains -// [_,0,1,2]. -// -// * Next, assume we have the same job, and for whatever reason the -// preferred workers were initially [_,0,0,0]. -// -// The main thread schedules the dispatcher onto worker 0. -// -// This dispatcher task runs on worker 0, and pushes the worker -// tasks back onto worker 0's queue. -// -// Workers 1 and 2 are idle, and steal tasks from worker 0. As the -// tasks run, they update the preferred_workers array to record the -// workers that execute them. -// -// After the loop, the preferred worker array may now be [_,0,2,1] -// or [_,0,1,2], reflecting the fact that the work has got -// re-distributed. The next loop will start out by distributing the -// work to those same workers. -// -// * Finally, let's assume we have two jobs running on two main -// threads, and we are now using DoP=2 in the loops, and have 2 -// workers in the thread pool (so the machine is not -// over-subscribed). -// -// Each main thread has its own preferred_workers, and -// let's say initially these are both [_,0]. -// -// Here, with DoP=2, each main thread will just dispatch a single -// task immediately (there is no need for asynchrony with only one -// task to generate). -// -// Initially both main threads will submit these tasks to worker 0. -// -// Once worker 1 steals one of these tasks, the task will update its -// preferred worker to be 1. -// -// From that point onwards, the two main threads will dispatch tasks -// to separate workers, avoiding the need for further work stealing. - -void InitializePreferredWorkers(std::vector &preferred_workers) { - static std::atomic next_worker; - - // preferred_workers[0] isn't supposed to be used, so initializng it with -1 to: - // a) fault if inapropriately accessed - // b) avoid wasting next_worker value - if (preferred_workers.size() == 0) - preferred_workers.push_back(-1); - - // preferred_workers maps from a par_idx to a q_idx, hence we - // initialize slots in the range [0,num_threads_] - while (preferred_workers.size() <= num_threads_) { - preferred_workers.push_back(next_worker++ % num_threads_); + + //---------------------------------------------------------------------- + // + // Preferred workers + // ----------------- + // + // Initialize the set of hints for preferred worker threads we will + // use. We do this once, covering the maximum num_threads_ items, + // in order to avoid resizing preferred_workers concurrent with + // access from worker threads. + // + // For simplicity we initialize with hints round-robin among the + // workers. For simple workloads with 1 main thread this means we + // will distribute work across the pool of workers. For workers + // with multiple main threads it attempts to balance the load. + // + // These hints are just used as a starting point, and are updated by + // the worker thread that actually claims an item (e.g., if an item + // initially assigned to thread T1 is stolen and executed by T2, + // then T2 is assigned at the new preferred worker). + // + // Note that the hints are held in the _main_ thread that submits + // work to the pool. We assume that a thread is primarily + // submitting work to just one pool, but allow for the pool to + // change over time. Hence we allow the hints vector to grow over + // time. + // + // A note on terminology used in the variable names here: + // + // dop - degree of parallelism, as seen by the user. For instance + // dop=4 means 4 threads in total: 1 main thread that enters the + // loop, plus 1 dispatcher thread, plus 2 additional worker + // threads. + // + // par_idx - a thread's index within the loop, in the range [0,dop). + // + // num_threads_ - the number of worker threads in the thread pool. A + // loop with dop=4 will be common on a pool with 3 threads + // (given that the main thread will also participate). + // + // q_idx - a worker queue index, in the range [0,num_threads_). + // + // preferred_workers - this maps from par_idx values to q_idx. Hence, + // with dop=4 the vector will have length 4, and will identify + // which of the workers (0,1,2) should run tasks for the loop. + // Note that mapping from par_idx values means that only slots + // [1,dop) are actually used in preferred_workers. + // + // Here are three examples, all assuming a machine with 4 h/w threads, + // and ORT configured to use dop=4. + // + // * First, suppose that a single job is running a series of loops. + // Its main thread enters a parallel loop. Initially, let's assume + // its preferred worker array is [_,0,1,2], writing "_" for the + // unusued element for the par_idx=0 work that the main thread will + // run. + // + // The main thread schedules the dispatcher task onto worker 0. + // + // The dispatcher task schedules worker tasks onto workers 1 and 2. + // + // The tasks all execute, without any work stealing, on the threads + // they were scheduled on. The preferred worker array remains + // [_,0,1,2]. + // + // * Next, assume we have the same job, and for whatever reason the + // preferred workers were initially [_,0,0,0]. + // + // The main thread schedules the dispatcher onto worker 0. + // + // This dispatcher task runs on worker 0, and pushes the worker + // tasks back onto worker 0's queue. + // + // Workers 1 and 2 are idle, and steal tasks from worker 0. As the + // tasks run, they update the preferred_workers array to record the + // workers that execute them. + // + // After the loop, the preferred worker array may now be [_,0,2,1] + // or [_,0,1,2], reflecting the fact that the work has got + // re-distributed. The next loop will start out by distributing the + // work to those same workers. + // + // * Finally, let's assume we have two jobs running on two main + // threads, and we are now using DoP=2 in the loops, and have 2 + // workers in the thread pool (so the machine is not + // over-subscribed). + // + // Each main thread has its own preferred_workers, and + // let's say initially these are both [_,0]. + // + // Here, with DoP=2, each main thread will just dispatch a single + // task immediately (there is no need for asynchrony with only one + // task to generate). + // + // Initially both main threads will submit these tasks to worker 0. + // + // Once worker 1 steals one of these tasks, the task will update its + // preferred worker to be 1. + // + // From that point onwards, the two main threads will dispatch tasks + // to separate workers, avoiding the need for further work stealing. + + void InitializePreferredWorkers(std::vector& preferred_workers) { + static std::atomic next_worker{0}; + + // preferred_workers[0] isn't supposed to be used, so initializing it with -1 to: + // a) fault if inappropriately accessed + // b) avoid wasting next_worker value + if (preferred_workers.empty()) { + preferred_workers.push_back(-1); + } + + // preferred_workers maps from a par_idx to a q_idx, hence we + // initialize slots in the range [0,num_threads_] + while (preferred_workers.size() <= num_threads_) { + preferred_workers.push_back(next_worker++ % num_threads_); + } } -} - -// Update the preferred worker for par_idx to be the calling thread - -void UpdatePreferredWorker(std::vector &preferred_workers, - unsigned par_idx) { - unsigned ran_on_idx = GetPerThread()->thread_id; - assert(ran_on_idx < num_threads_); - assert(par_idx < preferred_workers.size()); - preferred_workers[par_idx] = ran_on_idx; -} - -// Schedule [par_idx_start,par_idx_end) across the preferred workers - -void ScheduleOnPreferredWorkers(PerThread& pt, - ThreadPoolParallelSection& ps, - std::vector &preferred_workers, - unsigned par_idx_start, - unsigned par_idx_end, - std::function worker_fn) { - for (auto par_idx = par_idx_start; par_idx < par_idx_end; ++par_idx) { - // Look up hint for par_idx. Note that the hints may have been - // recorded from a prior thread pool with a different number of - // threads, hence we must cap at num_threads_. + + // Update the preferred worker for par_idx to be the calling thread + + void UpdatePreferredWorker(std::vector& preferred_workers, + unsigned par_idx) { + unsigned ran_on_idx = GetPerThread()->thread_id; + assert(ran_on_idx < num_threads_); assert(par_idx < preferred_workers.size()); - unsigned q_idx = preferred_workers[par_idx] % num_threads_; - assert(q_idx < num_threads_); - WorkerData& td = worker_data_[q_idx]; - Queue& q = td.queue; - unsigned w_idx; + preferred_workers[par_idx] = ran_on_idx; + } - // Attempt to enqueue the task - auto push_status = q.PushBackWithTag([worker_fn, par_idx, &preferred_workers, &ps, this]() { + // Schedule [par_idx_start,par_idx_end) across the preferred workers + + void ScheduleOnPreferredWorkers(PerThread& pt, + ThreadPoolParallelSection& ps, + std::vector& preferred_workers, + unsigned par_idx_start, + unsigned par_idx_end, + std::function worker_fn) { + for (auto par_idx = par_idx_start; par_idx < par_idx_end; ++par_idx) { + // Look up hint for par_idx. Note that the hints may have been + // recorded from a prior thread pool with a different number of + // threads, hence we must cap at num_threads_. + assert(par_idx < preferred_workers.size()); + unsigned q_idx = preferred_workers[par_idx] % num_threads_; + assert(q_idx < num_threads_); + WorkerData& td = worker_data_[q_idx]; + Queue& q = td.queue; + unsigned w_idx; + + // Attempt to enqueue the task + auto push_status = q.PushBackWithTag([worker_fn, par_idx, &preferred_workers, &ps, this]() { // Record the worker thread that actually runs this task. // This will form the preferred worker for the next loop. UpdatePreferredWorker(preferred_workers, par_idx); worker_fn(par_idx); ps.tasks_finished++; }, - pt.tag, - w_idx); - - // Queue accepted the task; wake the thread that owns the queue. - // In addition, if the queue was non-empty, attempt to wake - // another thread (which may then steal the task). - if (push_status == PushResult::ACCEPTED_IDLE || push_status == PushResult::ACCEPTED_BUSY) { - ps.tasks.push_back({q_idx, w_idx}); - td.EnsureAwake(); - if (push_status == PushResult::ACCEPTED_BUSY) { - worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake(); - } - } - } -} + pt.tag, w_idx); -//...................................................................... -// -// Parallel loops -// -------------- -// -// Ensure that the ThreadPoolParallelSection has sufficient workers to -// execute a loop with degree of parallelism n. We track the number -// of workers already avaiable to the parallel section, prior to -// submitting tasks to the work queues to make up the total. -// -// Each worker will call in to worker_fn(idx) with a per-worker thread -// ID. Note there are different levels of indirection here: -// -// - In a single-loop parallel section, worker_fn will directly -// execute the threadpool.cc code that implements the parallel loop. -// -// - In a multi-loop parallel section, worker_fn is an intermediate -// function that is long-lived (i.e., that lasts until the end of -// the parallel section, as opposed to just a single loop's -// duration). -// -// For ordinary parallel sections, RunInParallelInternal dispatch -// tasks to a number of workers asynchronously. A worker thread will -// be selected as the dispatcher that distributes tasks. This removes -// the O(n) work off the critical path of starting the first loop -// iteration, helping maintain good performance on very short loops. -// -// See the note on terminology above for the use of variable names -// here. - -void RunInParallelInternal(PerThread& pt, - ThreadPoolParallelSection& ps, - unsigned new_dop, - bool dispatch_async, - std::function worker_fn) { - - // Ensure that the vector of preferred workers is sufficient for the - // size of the loop we are entering. We do this before dispatching - // tasks for the loop in order to avoid any races between changes to - // the size of the vector and recording the locations that tasks run - // in as they complete. - assert(new_dop <= (unsigned)(num_threads_+1)); - std::vector &preferred_workers = pt.preferred_workers; - InitializePreferredWorkers(preferred_workers); - - // current_dop is the degree of parallelism via any workers already - // participating in the current parallel section. Usually, for - // single-loop parallel sections, current_dop=1. - unsigned current_dop = ps.current_dop; - - if (current_dop < new_dop) { - unsigned extra_needed = new_dop - current_dop; - - // Attempt to summon additional workers asynchronously if we - // need more than one. Otherwise, we fall back to simple - // synchronous scheduling. - if (dispatch_async && extra_needed > 1) { - assert(current_dop == 1); - - // Task for dispatching work asynchronously. - Task dispatch_task = [current_dop, new_dop, worker_fn, &preferred_workers, &ps, &pt, this]() { - // Record that dispatch work has started. This must occur - // prior to scheduling tasks, in order to synchronize with - // EndParallelSectionInternal. [ If EndParallelSection - // revoked a task, and then sees distpatch_started=false, then - // it knows that it revoked the dispatcher. Conversely, if it - // revokes a task, and then sees dispatch_started=true, then - // it knows it revoked a worker task. ] - ps.dispatch_started.store(true, std::memory_order_seq_cst); - - // Schedule tasks par_idx=[current_dop+1,new_dop) - ScheduleOnPreferredWorkers(pt, ps, preferred_workers, current_dop+1, new_dop, worker_fn); - ps.dispatch_done.store(true, std::memory_order_release); - - // Record the worker thread that actually runs this task. - // This will form the preferred worker for the next loop. - UpdatePreferredWorker(preferred_workers, current_dop); - - // Run dispatcher task's own work, par_idx=current_dop - worker_fn(current_dop); - - // Dispatcher's work complete - ps.work_done.store(true, std::memory_order_release); - }; - - profiler_.LogStart(); - ps.dispatch_q_idx = preferred_workers[current_dop] % num_threads_; - WorkerData& dispatch_td = worker_data_[ps.dispatch_q_idx]; - Queue& dispatch_que = dispatch_td.queue; - - // assign dispatch task to selected dispatcher - auto push_status = dispatch_que.PushBackWithTag(dispatch_task, pt.tag, ps.dispatch_w_idx); // Queue accepted the task; wake the thread that owns the queue. // In addition, if the queue was non-empty, attempt to wake // another thread (which may then steal the task). if (push_status == PushResult::ACCEPTED_IDLE || push_status == PushResult::ACCEPTED_BUSY) { - dispatch_td.EnsureAwake(); + ps.tasks.push_back({q_idx, w_idx}); + td.EnsureAwake(); if (push_status == PushResult::ACCEPTED_BUSY) { worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake(); } - } else { - ps.dispatch_q_idx = -1; // failed to enqueue dispatch_task } - profiler_.LogEnd(ThreadPoolProfiler::DISTRIBUTION_ENQUEUE); - } else { - // Synchronous dispatch - ScheduleOnPreferredWorkers(pt, ps, preferred_workers, current_dop, new_dop, std::move(worker_fn)); } - ps.current_dop = new_dop; } -} - -// Run a single parallel loop in an existing parallel section. This -// maps directly onto SummonWorkers to create sufficient worker -// threads for the desired degree of parallelism, followed by -// dispatching the loop to those workers. -void RunInParallelSection(ThreadPoolParallelSection &ps, - std::function fn, - unsigned n, - std::ptrdiff_t block_size) override { - ORT_ENFORCE(n <= num_threads_+1, "More work items than threads"); - profiler_.LogStartAndCoreAndBlock(block_size); - PerThread* pt = GetPerThread(); - assert(pt->leading_par_section && "RunInParallel, but not in parallel section"); - assert((n > 1) && "Trivial parallel section; should be avoided by caller"); - - // Publish the work to any existing workers in the parallel - // section, and ensure it is visible to any new threads created - // below. - assert((!ps.current_loop) && "RunInParallelSection, but loop already active"); - ThreadPoolLoop loop{std::move(fn), n}; - ps.current_loop = &loop; - - // Increase the worker count if needed. Each worker will pick up - // loops to execute from the current parallel section. - std::function worker_fn = [&ps](unsigned par_idx) { - while (ps.active) { - if (ps.current_loop.load() == nullptr) { - onnxruntime::concurrency::SpinPause(); + + //...................................................................... + // + // Parallel loops + // -------------- + // + // Ensure that the ThreadPoolParallelSection has sufficient workers to + // execute a loop with degree of parallelism n. We track the number + // of workers already avaiable to the parallel section, prior to + // submitting tasks to the work queues to make up the total. + // + // Each worker will call in to worker_fn(idx) with a per-worker thread + // ID. Note there are different levels of indirection here: + // + // - In a single-loop parallel section, worker_fn will directly + // execute the threadpool.cc code that implements the parallel loop. + // + // - In a multi-loop parallel section, worker_fn is an intermediate + // function that is long-lived (i.e., that lasts until the end of + // the parallel section, as opposed to just a single loop's + // duration). + // + // For ordinary parallel sections, RunInParallelInternal dispatch + // tasks to a number of workers asynchronously. A worker thread will + // be selected as the dispatcher that distributes tasks. This removes + // the O(n) work off the critical path of starting the first loop + // iteration, helping maintain good performance on very short loops. + // + // See the note on terminology above for the use of variable names + // here. + + void RunInParallelInternal(PerThread& pt, + ThreadPoolParallelSection& ps, + unsigned new_dop, + bool dispatch_async, + std::function worker_fn) { + // Ensure that the vector of preferred workers is sufficient for the + // size of the loop we are entering. We do this before dispatching + // tasks for the loop in order to avoid any races between changes to + // the size of the vector and recording the locations that tasks run + // in as they complete. + assert(new_dop <= (unsigned)(num_threads_ + 1)); + std::vector& preferred_workers = pt.preferred_workers; + InitializePreferredWorkers(preferred_workers); + + // current_dop is the degree of parallelism via any workers already + // participating in the current parallel section. Usually, for + // single-loop parallel sections, current_dop=1. + unsigned current_dop = ps.current_dop; + + if (current_dop < new_dop) { + unsigned extra_needed = new_dop - current_dop; + + // Attempt to summon additional workers asynchronously if we + // need more than one. Otherwise, we fall back to simple + // synchronous scheduling. + if (dispatch_async && extra_needed > 1) { + assert(current_dop == 1); + + // Task for dispatching work asynchronously. + Task dispatch_task = [current_dop, new_dop, worker_fn, &preferred_workers, &ps, &pt, this]() { + // Record that dispatch work has started. This must occur + // prior to scheduling tasks, in order to synchronize with + // EndParallelSectionInternal. [ If EndParallelSection + // revoked a task, and then sees distpatch_started=false, then + // it knows that it revoked the dispatcher. Conversely, if it + // revokes a task, and then sees dispatch_started=true, then + // it knows it revoked a worker task. ] + ps.dispatch_started.store(true, std::memory_order_seq_cst); + + // Schedule tasks par_idx=[current_dop+1,new_dop) + ScheduleOnPreferredWorkers(pt, ps, preferred_workers, current_dop + 1, new_dop, worker_fn); + ps.dispatch_done.store(true, std::memory_order_release); + + // Record the worker thread that actually runs this task. + // This will form the preferred worker for the next loop. + UpdatePreferredWorker(preferred_workers, current_dop); + + // Run dispatcher task's own work, par_idx=current_dop + worker_fn(current_dop); + + // Dispatcher's work complete + ps.work_done.store(true, std::memory_order_release); + }; + + profiler_.LogStart(); + ps.dispatch_q_idx = preferred_workers[current_dop] % num_threads_; + WorkerData& dispatch_td = worker_data_[ps.dispatch_q_idx]; + Queue& dispatch_que = dispatch_td.queue; + + // assign dispatch task to selected dispatcher + auto push_status = dispatch_que.PushBackWithTag(dispatch_task, pt.tag, ps.dispatch_w_idx); + // Queue accepted the task; wake the thread that owns the queue. + // In addition, if the queue was non-empty, attempt to wake + // another thread (which may then steal the task). + if (push_status == PushResult::ACCEPTED_IDLE || push_status == PushResult::ACCEPTED_BUSY) { + dispatch_td.EnsureAwake(); + if (push_status == PushResult::ACCEPTED_BUSY) { + worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake(); + } + } else { + ps.dispatch_q_idx = -1; // failed to enqueue dispatch_task + } + profiler_.LogEnd(ThreadPoolProfiler::DISTRIBUTION_ENQUEUE); } else { - ps.workers_in_loop++; - ThreadPoolLoop *work_item = ps.current_loop; - if (work_item && par_idx < work_item->threads_needed) { - work_item->fn(par_idx); + // Synchronous dispatch + ScheduleOnPreferredWorkers(pt, ps, preferred_workers, current_dop, new_dop, std::move(worker_fn)); + } + ps.current_dop = new_dop; + } + } + + // Run a single parallel loop in an existing parallel section. This + // maps directly onto SummonWorkers to create sufficient worker + // threads for the desired degree of parallelism, followed by + // dispatching the loop to those workers. + void RunInParallelSection(ThreadPoolParallelSection& ps, + std::function fn, + unsigned n, + std::ptrdiff_t block_size) override { + ORT_ENFORCE(n <= num_threads_ + 1, "More work items than threads"); + profiler_.LogStartAndCoreAndBlock(block_size); + PerThread* pt = GetPerThread(); + assert(pt->leading_par_section && "RunInParallel, but not in parallel section"); + assert((n > 1) && "Trivial parallel section; should be avoided by caller"); + + // Publish the work to any existing workers in the parallel + // section, and ensure it is visible to any new threads created + // below. + assert((!ps.current_loop) && "RunInParallelSection, but loop already active"); + ThreadPoolLoop loop{std::move(fn), n}; + ps.current_loop = &loop; + + // Increase the worker count if needed. Each worker will pick up + // loops to execute from the current parallel section. + std::function worker_fn = [&ps](unsigned par_idx) { + while (ps.active) { + if (ps.current_loop.load() == nullptr) { + onnxruntime::concurrency::SpinPause(); + } else { + ps.workers_in_loop++; + ThreadPoolLoop* work_item = ps.current_loop; + if (work_item && par_idx < work_item->threads_needed) { + work_item->fn(par_idx); + } + ps.workers_in_loop--; } - ps.workers_in_loop--; } + }; + RunInParallelInternal(*pt, ps, n, false, std::move(worker_fn)); + assert(ps.dispatch_q_idx == -1); + profiler_.LogEndAndStart(ThreadPoolProfiler::DISTRIBUTION); + + // Run work in the main thread + loop.fn(0); + profiler_.LogEndAndStart(ThreadPoolProfiler::RUN); + + // Wait for workers to exit the loop + ps.current_loop = 0; + while (ps.workers_in_loop) { + onnxruntime::concurrency::SpinPause(); } - }; - RunInParallelInternal(*pt, ps, n, false, std::move(worker_fn)); - assert(ps.dispatch_q_idx == -1); - profiler_.LogEndAndStart(ThreadPoolProfiler::DISTRIBUTION); - - // Run work in the main thread - loop.fn(0); - profiler_.LogEndAndStart(ThreadPoolProfiler::RUN); - - // Wait for workers to exit the loop - ps.current_loop = 0; - while (ps.workers_in_loop) { - onnxruntime::concurrency::SpinPause(); + profiler_.LogEnd(ThreadPoolProfiler::WAIT); + } + + // Run a single parallel loop _without_ a parallel section. This is a + // special case of RunInParallelSection, avoiding code paths for + // handing off multiple loops to the pool of workers. + // For main thread: + // 1. select a dispatcher and do job distribution; + // 2. run fn(0); + // 3, wait for all; + // For dispatcher: + // 1. distribute jobs to all other threads; + // 2. run fn(...) itself. + // For all other threads: + // 1. run fn(...); + void RunInParallel(std::function fn, unsigned n, std::ptrdiff_t block_size) override { + ORT_ENFORCE(n <= num_threads_ + 1, "More work items than threads"); + profiler_.LogStartAndCoreAndBlock(block_size); + PerThread* pt = GetPerThread(); + ThreadPoolParallelSection ps; + StartParallelSectionInternal(*pt, ps); + RunInParallelInternal(*pt, ps, n, true, fn); // select dispatcher and do job distribution; + profiler_.LogEndAndStart(ThreadPoolProfiler::DISTRIBUTION); + fn(0); // run fn(0) + profiler_.LogEndAndStart(ThreadPoolProfiler::RUN); + EndParallelSectionInternal(*pt, ps); // wait for all + profiler_.LogEnd(ThreadPoolProfiler::WAIT); + } + + int NumThreads() const final { + return num_threads_; + } + + int CurrentThreadId() const final { + const PerThread* pt = const_cast(this)->GetPerThread(); + if (pt->pool == this) { + return pt->thread_id; + } + return -1; + } + + void EnableSpinning() { + spin_loop_status_ = SpinLoopStatus::kBusy; } - profiler_.LogEnd(ThreadPoolProfiler::WAIT); -} - -// Run a single parallel loop _without_ a parallel section. This is a -// special case of RunInParallelSection, avoiding code paths for -// handing off multiple loops to the pool of workers. -// For main thread: -// 1. select a dispatcher and do job distribution; -// 2. run fn(0); -// 3, wait for all; -// For dispatcher: -// 1. distribute jobs to all other threads; -// 2. run fn(...) itself. -// For all other threads: -// 1. run fn(...); -void RunInParallel(std::function fn, unsigned n, std::ptrdiff_t block_size) override { - ORT_ENFORCE(n <= num_threads_+1, "More work items than threads"); - profiler_.LogStartAndCoreAndBlock(block_size); - PerThread* pt = GetPerThread(); - ThreadPoolParallelSection ps; - StartParallelSectionInternal(*pt, ps); - RunInParallelInternal(*pt, ps, n, true, fn); // select dispatcher and do job distribution; - profiler_.LogEndAndStart(ThreadPoolProfiler::DISTRIBUTION); - fn(0); // run fn(0) - profiler_.LogEndAndStart(ThreadPoolProfiler::RUN); - EndParallelSectionInternal(*pt, ps); // wait for all - profiler_.LogEnd(ThreadPoolProfiler::WAIT); -} - -int NumThreads() const final { - return num_threads_; -} - -int CurrentThreadId() const final { - const PerThread* pt = const_cast(this)->GetPerThread(); - if (pt->pool == this) { - return pt->thread_id; + + void DisableSpinning() { + spin_loop_status_ = SpinLoopStatus::kIdle; } - return -1; -} private: void ComputeCoprimes(int N, Eigen::MaxSizeVector* coprimes) { @@ -1392,6 +1395,7 @@ int CurrentThreadId() const final { assert(seen != ThreadStatus::Blocking); if (seen == ThreadStatus::Blocked) { status = ThreadStatus::Waking; + lk.unlock(); cv.notify_one(); } } @@ -1400,12 +1404,12 @@ int CurrentThreadId() const final { // State transitions, called only from the thread itself void SetActive() { - std::unique_lock lk(mutex); + std::lock_guard lk(mutex); status = ThreadStatus::Active; } void SetSpinning() { - std::unique_lock lk(mutex); + std::lock_guard lk(mutex); status = ThreadStatus::Spinning; } @@ -1424,7 +1428,7 @@ int CurrentThreadId() const final { status = ThreadStatus::Spinning; } - private: + private: std::atomic status{ThreadStatus::Spinning}; OrtMutex mutex; OrtCondVar cv; @@ -1439,13 +1443,25 @@ int CurrentThreadId() const final { std::atomic blocked_; // Count of blocked workers, used as a termination condition std::atomic done_; + // SpinLoopStatus indicates whether the main worker spinning (inner) loop should exit immediately when there is + // no work available (kIdle) or whether it should follow the configured spin-then-block policy (kBusy). + // This lets the ORT session layer hint to the thread pool that it should stop spinning in between + // requests. + enum class SpinLoopStatus { + kIdle, + kBusy + }; + + // Default is no control over spinning + std::atomic spin_loop_status_{SpinLoopStatus::kBusy}; + // Wake any blocked workers so that they can cleanly exit WorkerLoop(). For // a clean exit, each thread will observe (1) done_ set, indicating that the // destructor has been called, (2) all threads blocked, and (3) no // items in the work queues. void WakeAllWorkersForExit() { - for (auto &td: worker_data_) { + for (auto& td : worker_data_) { td.EnsureAwake(); } } @@ -1462,8 +1478,8 @@ int CurrentThreadId() const final { assert(td.GetStatus() == WorkerData::ThreadStatus::Spinning); constexpr int log2_spin = 20; - const int spin_count = allow_spinning_ ? (1ull< bool { - bool should_block = true; - // Check whether work was pushed to us while attempting to block. We make - // this test while holding the per-thread status lock, and after setting - // our status to ThreadStatus::Blocking. - // - // This synchronizes with ThreadPool::Schedule which pushes work to the queue - // and then tests for ThreadStatus::Blocking/Blocked (via EnsureAwake): - // - // Main thread: Worker: - // #1 Push work #A Set status blocking - // #2 Read worker status #B Check queue - // #3 Wake if blocking/blocked - // - // If #A is before #2 then main sees worker blocked and wakes - // - // If #A if after #2 then #B will see #1, and we abandon blocking - assert(!t); - t = q.PopFront(); - if (t) { - should_block = false; - } - - // No work pushed to us, continue attempting to block. The remaining - // test is to synchronize with termination requests. If we are - // shutting down and all worker threads blocked without work, that's - // we are done. - if (should_block) { - blocked_++; - if (done_ && blocked_ == num_threads_) { - should_block = false; - // Almost done, but need to re-check queues. - // Consider that all queues are empty and all worker threads are preempted - // right after incrementing blocked_ above. Now a free-standing thread - // submits work and calls destructor (which sets done_). If we don't - // re-check queues, we will exit leaving the work unexecuted. - if (NonEmptyQueueIndex() != -1) { - // Note: we must not pop from queues before we decrement blocked_, - // otherwise the following scenario is possible. Consider that instead - // of checking for emptiness we popped the only element from queues. - // Now other worker threads can start exiting, which is bad if the - // work item submits other work. So we just check emptiness here, - // which ensures that all worker threads exit at the same time. - blocked_--; - } else { - should_exit = true; - } - } - } - return should_block; - }, - // Post-block update (executed only if we blocked) - [&]() { - blocked_--; - }); + td.SetBlocked( // Pre-block test + [&]() -> bool { + bool should_block = true; + // Check whether work was pushed to us while attempting to block. We make + // this test while holding the per-thread status lock, and after setting + // our status to ThreadStatus::Blocking. + // + // This synchronizes with ThreadPool::Schedule which pushes work to the queue + // and then tests for ThreadStatus::Blocking/Blocked (via EnsureAwake): + // + // Main thread: Worker: + // #1 Push work #A Set status blocking + // #2 Read worker status #B Check queue + // #3 Wake if blocking/blocked + // + // If #A is before #2 then main sees worker blocked and wakes + // + // If #A if after #2 then #B will see #1, and we abandon blocking + assert(!t); + t = q.PopFront(); + if (t) { + should_block = false; + } + + // No work pushed to us, continue attempting to block. The remaining + // test is to synchronize with termination requests. If we are + // shutting down and all worker threads blocked without work, that's + // we are done. + if (should_block) { + blocked_++; + if (done_ && blocked_ == num_threads_) { + should_block = false; + // Almost done, but need to re-check queues. + // Consider that all queues are empty and all worker threads are preempted + // right after incrementing blocked_ above. Now a free-standing thread + // submits work and calls destructor (which sets done_). If we don't + // re-check queues, we will exit leaving the work unexecuted. + if (NonEmptyQueueIndex() != -1) { + // Note: we must not pop from queues before we decrement blocked_, + // otherwise the following scenario is possible. Consider that instead + // of checking for emptiness we popped the only element from queues. + // Now other worker threads can start exiting, which is bad if the + // work item submits other work. So we just check emptiness here, + // which ensures that all worker threads exit at the same time. + blocked_--; + } else { + should_exit = true; + } + } + } + return should_block; + }, + // Post-block update (executed only if we blocked) + [&]() { + blocked_--; + }); // Thread just unblocked. Unless we picked up work while // blocking, or are exiting, then either work was pushed to // us, or it was pushed to an overloaded queue @@ -1548,6 +1567,7 @@ int CurrentThreadId() const final { if (!t) t = Steal(StealAttemptKind::TRY_ALL); } } + if (t) { td.SetActive(); t(); @@ -1555,7 +1575,7 @@ int CurrentThreadId() const final { td.SetSpinning(); } } - + // Whichever thread(s) observe the termination conditions are responsible for waking // any other threads that have remained blocked. if (should_exit) { @@ -1577,7 +1597,7 @@ int CurrentThreadId() const final { unsigned r = Rand(&pt->rand); unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()]; unsigned victim = r % size; - + for (unsigned i = 0; i < num_attempts; i++) { assert(victim < size); if (worker_data_[victim].GetStatus() == WorkerData::ThreadStatus::Active) { @@ -1636,6 +1656,6 @@ int CurrentThreadId() const final { } }; - } // namespace concurrency +} // namespace concurrency } // namespace onnxruntime diff --git a/include/onnxruntime/core/platform/threadpool.h b/include/onnxruntime/core/platform/threadpool.h index 23f317f97751..10e8c1db5b38 100644 --- a/include/onnxruntime/core/platform/threadpool.h +++ b/include/onnxruntime/core/platform/threadpool.h @@ -221,6 +221,14 @@ class ThreadPool { "Per-thread state should be trivially destructible"); }; + // The below API allows to disable spinning + // This is used to support real-time scenarios where + // spinning between relatively infrequent requests + // contributes to high CPU usage while not processing anything. + void EnableSpinning(); + + void DisableSpinning(); + // Schedules fn() for execution in the pool of threads. The function may run // synchronously if it cannot be enqueued. This will occur if the thread pool's // degree-of-parallelism is 1, but it may also occur for implementation-dependent diff --git a/include/onnxruntime/core/session/onnxruntime_session_options_config_keys.h b/include/onnxruntime/core/session/onnxruntime_session_options_config_keys.h index f1a7009a8997..01d49b16a85e 100644 --- a/include/onnxruntime/core/session/onnxruntime_session_options_config_keys.h +++ b/include/onnxruntime/core/session/onnxruntime_session_options_config_keys.h @@ -114,6 +114,13 @@ static const char* const kOrtSessionOptionsConfigNnapiEpPartitioningStopOps = "e // Available since version 1.11. static const char* const kOrtSessionOptionsConfigDynamicBlockBase = "session.dynamic_block_base"; +// This option allows to decrease CPU usage between infrequent +// requests and forces any TP threads spinning stop immediately when the last of +// concurrent Run() call returns. +// Spinning is restarted on the next Run() call. +// Applies only to internal thread-pools +static const char* const kOrtSessionOptionsConfigForceSpinningStop = "session.force_spinning_stop"; + // "1": all inconsistencies encountered during shape and type inference // will result in failures. // "0": in some cases warnings will be logged but processing will continue. The default. diff --git a/onnxruntime/core/common/threadpool.cc b/onnxruntime/core/common/threadpool.cc index ad12198091b5..c10a6ea70527 100644 --- a/onnxruntime/core/common/threadpool.cc +++ b/onnxruntime/core/common/threadpool.cc @@ -657,6 +657,18 @@ std::string ThreadPool::StopProfiling(concurrency::ThreadPool* tp) { } } +void ThreadPool::EnableSpinning() { + if (extended_eigen_threadpool_) { + extended_eigen_threadpool_->EnableSpinning(); + } +} + +void ThreadPool::DisableSpinning() { + if (extended_eigen_threadpool_) { + extended_eigen_threadpool_->DisableSpinning(); + } +} + // Return the number of threads created by the pool. int ThreadPool::NumThreads() const { if (underlying_threadpool_) { diff --git a/onnxruntime/core/session/inference_session.cc b/onnxruntime/core/session/inference_session.cc index 168a12698fc9..10078eaf7c96 100644 --- a/onnxruntime/core/session/inference_session.cc +++ b/onnxruntime/core/session/inference_session.cc @@ -264,6 +264,7 @@ void InferenceSession::ConstructorCommon(const SessionOptions& session_options, } use_per_session_threads_ = session_options.use_per_session_threads; + force_spinning_stop_between_runs_ = session_options_.config_options.GetConfigOrDefault(kOrtSessionOptionsConfigForceSpinningStop, "0") == "1"; if (use_per_session_threads_) { LOGS(*session_logger_, INFO) << "Creating and using per session threadpools since use_per_session_threads_ is true"; @@ -1835,6 +1836,31 @@ Status InferenceSession::PartialRun(onnxruntime::RunOptions& run_options, } #endif +namespace { +// Concurrent runs counting and thread-pool spin control +struct ThreadPoolSpinningSwitch { + concurrency::ThreadPool* intra_tp_{nullptr}; + concurrency::ThreadPool* inter_tp_{nullptr}; + std::atomic& concurrent_num_runs_; + // __Ctor Refcounting and spinning control + ThreadPoolSpinningSwitch(concurrency::ThreadPool* intra_tp, + concurrency::ThreadPool* inter_tp, + std::atomic& ref) noexcept + : intra_tp_(intra_tp), inter_tp_(inter_tp), concurrent_num_runs_(ref) { + if (concurrent_num_runs_.fetch_add(1, std::memory_order_relaxed) == 0) { + if (intra_tp_) intra_tp_->EnableSpinning(); + if (inter_tp_) inter_tp_->EnableSpinning(); + } + } + ~ThreadPoolSpinningSwitch() { + if (1 == concurrent_num_runs_.fetch_sub(1, std::memory_order_acq_rel)) { + if (intra_tp_) intra_tp_->DisableSpinning(); + if (inter_tp_) inter_tp_->DisableSpinning(); + } + } +}; +} // namespace + Status InferenceSession::Run(const RunOptions& run_options, const std::vector& feed_names, const std::vector& feeds, const std::vector& output_names, std::vector* p_fetches, @@ -1852,12 +1878,20 @@ Status InferenceSession::Run(const RunOptions& run_options, Status retval = Status::OK(); const Env& env = Env::Default(); + // Increment/decrement concurrent_num_runs_ and control + // session threads spinning as configured. Do nothing for graph replay except the counter. + const bool control_spinning = use_per_session_threads_ && + force_spinning_stop_between_runs_ && + !cached_execution_provider_for_graph_replay_.IsGraphCaptured(); + auto* intra_tp = (control_spinning) ? thread_pool_.get() : nullptr; + auto* inter_tp = (control_spinning) ? inter_op_thread_pool_.get() : nullptr; + ThreadPoolSpinningSwitch runs_refcounter_and_tp_spin_control(intra_tp, inter_tp, current_num_runs_); + // Check if this Run() is simply going to be a CUDA Graph replay. if (cached_execution_provider_for_graph_replay_.IsGraphCaptured()) { LOGS(*session_logger_, INFO) << "Replaying the captured " << cached_execution_provider_for_graph_replay_.Type() << " CUDA Graph for this model with tag: " << run_options.run_tag; - ++current_num_runs_; ORT_RETURN_IF_ERROR_SESSIONID_(cached_execution_provider_for_graph_replay_.ReplayGraph()); } else { std::vector exec_providers_to_stop; @@ -1902,8 +1936,6 @@ Status InferenceSession::Run(const RunOptions& run_options, LOGS(*session_logger_, INFO) << "Running with tag: " << run_options.run_tag; } - ++current_num_runs_; - // scope of owned_run_logger is just the call to Execute. // If Execute ever becomes async we need a different approach std::unique_ptr owned_run_logger; @@ -1939,6 +1971,7 @@ Status InferenceSession::Run(const RunOptions& run_options, #ifdef DEBUG_NODE_INPUTS_OUTPUTS session_state_->IncrementGraphExecutionCounter(); #endif + ORT_CHECK_AND_SET_RETVAL(utils::ExecuteGraph(*session_state_, feeds_fetches_manager, feeds, *p_fetches, session_options_.execution_mode, run_options.terminate, run_logger, run_options.only_execute_path_to_fetches)); @@ -1962,7 +1995,6 @@ Status InferenceSession::Run(const RunOptions& run_options, ShrinkMemoryArenas(arenas_to_shrink); } } - --current_num_runs_; // keep track of telemetry ++telemetry_.total_runs_since_last_; diff --git a/onnxruntime/core/session/inference_session.h b/onnxruntime/core/session/inference_session.h index 996011da4329..3539f5a4ca18 100644 --- a/onnxruntime/core/session/inference_session.h +++ b/onnxruntime/core/session/inference_session.h @@ -664,6 +664,12 @@ class InferenceSession { std::basic_string thread_pool_name_; std::basic_string inter_thread_pool_name_; + // This option allows to decrease CPU usage between infrequent + // requests and forces any TP threads spinning stop immediately when the last of + // concurrent ExecuteGraph() call returns. + // Spinning is restarted on the next Run() + bool force_spinning_stop_between_runs_ = false; + std::unique_ptr thread_pool_; std::unique_ptr inter_op_thread_pool_;