From 0ebfa538fd81916045664790337ce48b66e2aa19 Mon Sep 17 00:00:00 2001 From: Andrew Bell Date: Sat, 24 Aug 2024 16:09:40 -0400 Subject: [PATCH] Add support for accepting std::function in a job queue. (#10628) Simplify some of the wait logic. --- port/cpl_worker_thread_pool.cpp | 106 ++++++++++++-------------------- port/cpl_worker_thread_pool.h | 6 +- 2 files changed, 43 insertions(+), 69 deletions(-) diff --git a/port/cpl_worker_thread_pool.cpp b/port/cpl_worker_thread_pool.cpp index 90db56f418bb..c26fbd3d8030 100644 --- a/port/cpl_worker_thread_pool.cpp +++ b/port/cpl_worker_thread_pool.cpp @@ -354,10 +354,8 @@ void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs) if (nMaxRemainingJobs < 0) nMaxRemainingJobs = 0; std::unique_lock oGuard(m_mutex); - while (nPendingJobs > nMaxRemainingJobs) - { - m_cv.wait(oGuard); - } + m_cv.wait(oGuard, [this, nMaxRemainingJobs] + { return nPendingJobs <= nMaxRemainingJobs; }); } /************************************************************************/ @@ -368,21 +366,15 @@ void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs) */ void CPLWorkerThreadPool::WaitEvent() { + // NOTE - This isn't quite right. After nPendingJobsBefore is set but before + // a notification occurs, jobs could be submitted which would increase + // nPendingJobs, so a job completion may looks like a spurious wakeup. std::unique_lock oGuard(m_mutex); - while (true) - { - const int nPendingJobsBefore = nPendingJobs; - if (nPendingJobsBefore == 0) - { - break; - } - m_cv.wait(oGuard); - // cppcheck-suppress knownConditionTrueFalse - if (nPendingJobs < nPendingJobsBefore) - { - break; - } - } + if (nPendingJobs == 0) + return; + const int nPendingJobsBefore = nPendingJobs; + m_cv.wait(oGuard, [this, nPendingJobsBefore] + { return nPendingJobs < nPendingJobsBefore; }); } /************************************************************************/ @@ -580,29 +572,6 @@ CPLJobQueue::~CPLJobQueue() WaitCompletion(); } -/************************************************************************/ -/* JobQueueJob */ -/************************************************************************/ - -struct JobQueueJob -{ - CPLJobQueue *poQueue = nullptr; - CPLThreadFunc pfnFunc = nullptr; - void *pData = nullptr; -}; - -/************************************************************************/ -/* JobQueueFunction() */ -/************************************************************************/ - -void CPLJobQueue::JobQueueFunction(void *pData) -{ - JobQueueJob *poJob = static_cast(pData); - poJob->pfnFunc(poJob->pData); - poJob->poQueue->DeclareJobFinished(); - delete poJob; -} - /************************************************************************/ /* DeclareJobFinished() */ /************************************************************************/ @@ -626,16 +595,28 @@ void CPLJobQueue::DeclareJobFinished() */ bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData) { - JobQueueJob *poJob = new JobQueueJob; - poJob->poQueue = this; - poJob->pfnFunc = pfnFunc; - poJob->pData = pData; + return SubmitJob([=] { pfnFunc(pData); }); +} + +/** Queue a new job. + * + * @param task Task to execute. + * @return true in case of success. + */ +bool CPLJobQueue::SubmitJob(std::function task) +{ { std::lock_guard oGuard(m_mutex); m_nPendingJobs++; } + // cppcheck-suppress knownConditionTrueFalse - return m_poPool->SubmitJob(JobQueueFunction, poJob); + return m_poPool->SubmitJob( + [this, task] + { + task(); + DeclareJobFinished(); + }); } /************************************************************************/ @@ -651,11 +632,8 @@ bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData) void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs) { std::unique_lock oGuard(m_mutex); - // coverity[missing_lock:FALSE] - while (m_nPendingJobs > nMaxRemainingJobs) - { - m_cv.wait(oGuard); - } + m_cv.wait(oGuard, [this, nMaxRemainingJobs] + { return m_nPendingJobs <= nMaxRemainingJobs; }); } /************************************************************************/ @@ -668,21 +646,15 @@ void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs) */ bool CPLJobQueue::WaitEvent() { + // NOTE - This isn't quite right. After nPendingJobsBefore is set but before + // a notification occurs, jobs could be submitted which would increase + // nPendingJobs, so a job completion may looks like a spurious wakeup. std::unique_lock oGuard(m_mutex); - while (true) - { - // coverity[missing_lock:FALSE] - const int nPendingJobsBefore = m_nPendingJobs; - if (nPendingJobsBefore == 0) - { - return false; - } - m_cv.wait(oGuard); - // cppcheck-suppress knownConditionTrueFalse - // coverity[missing_lock:FALSE] - if (m_nPendingJobs < nPendingJobsBefore) - { - return m_nPendingJobs > 0; - } - } + if (m_nPendingJobs == 0) + return false; + + const int nPendingJobsBefore = m_nPendingJobs; + m_cv.wait(oGuard, [this, nPendingJobsBefore] + { return m_nPendingJobs < nPendingJobsBefore; }); + return m_nPendingJobs > 0; } diff --git a/port/cpl_worker_thread_pool.h b/port/cpl_worker_thread_pool.h index e88978eb71f4..25d97fd54fa3 100644 --- a/port/cpl_worker_thread_pool.h +++ b/port/cpl_worker_thread_pool.h @@ -74,6 +74,8 @@ typedef enum #endif // ndef DOXYGEN_SKIP class CPLJobQueue; +/// Unique pointer to a job queue. +using CPLJobQueuePtr = std::unique_ptr; /** Pool of worker threads */ class CPL_DLL CPLWorkerThreadPool @@ -106,7 +108,7 @@ class CPL_DLL CPLWorkerThreadPool bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData, bool bWaitallStarted); - std::unique_ptr CreateJobQueue(); + CPLJobQueuePtr CreateJobQueue(); bool SubmitJob(std::function task); bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); @@ -127,7 +129,6 @@ class CPL_DLL CPLJobQueue std::condition_variable m_cv{}; int m_nPendingJobs = 0; - static void JobQueueFunction(void *); void DeclareJobFinished(); //! @cond Doxygen_Suppress @@ -146,6 +147,7 @@ class CPL_DLL CPLJobQueue } bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); + bool SubmitJob(std::function task); void WaitCompletion(int nMaxRemainingJobs = 0); bool WaitEvent(); };