Skip to content

Commit

Permalink
Add support for accepting std::function<void()> in a job queue. (#10628)
Browse files Browse the repository at this point in the history
Simplify some of the wait logic.
  • Loading branch information
abellgithub committed Aug 24, 2024
1 parent b39ca5a commit 0ebfa53
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 69 deletions.
106 changes: 39 additions & 67 deletions port/cpl_worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,8 @@ void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
if (nMaxRemainingJobs < 0)
nMaxRemainingJobs = 0;
std::unique_lock<std::mutex> oGuard(m_mutex);
while (nPendingJobs > nMaxRemainingJobs)
{
m_cv.wait(oGuard);
}
m_cv.wait(oGuard, [this, nMaxRemainingJobs]
{ return nPendingJobs <= nMaxRemainingJobs; });
}

/************************************************************************/
Expand All @@ -368,21 +366,15 @@ void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
*/
void CPLWorkerThreadPool::WaitEvent()
{
// NOTE - This isn't quite right. After nPendingJobsBefore is set but before
// a notification occurs, jobs could be submitted which would increase
// nPendingJobs, so a job completion may looks like a spurious wakeup.
std::unique_lock<std::mutex> oGuard(m_mutex);
while (true)
{
const int nPendingJobsBefore = nPendingJobs;
if (nPendingJobsBefore == 0)
{
break;
}
m_cv.wait(oGuard);
// cppcheck-suppress knownConditionTrueFalse
if (nPendingJobs < nPendingJobsBefore)
{
break;
}
}
if (nPendingJobs == 0)
return;
const int nPendingJobsBefore = nPendingJobs;
m_cv.wait(oGuard, [this, nPendingJobsBefore]
{ return nPendingJobs < nPendingJobsBefore; });
}

/************************************************************************/
Expand Down Expand Up @@ -580,29 +572,6 @@ CPLJobQueue::~CPLJobQueue()
WaitCompletion();
}

/************************************************************************/
/* JobQueueJob */
/************************************************************************/

struct JobQueueJob
{
CPLJobQueue *poQueue = nullptr;
CPLThreadFunc pfnFunc = nullptr;
void *pData = nullptr;
};

/************************************************************************/
/* JobQueueFunction() */
/************************************************************************/

void CPLJobQueue::JobQueueFunction(void *pData)
{
JobQueueJob *poJob = static_cast<JobQueueJob *>(pData);
poJob->pfnFunc(poJob->pData);
poJob->poQueue->DeclareJobFinished();
delete poJob;
}

/************************************************************************/
/* DeclareJobFinished() */
/************************************************************************/
Expand All @@ -626,16 +595,28 @@ void CPLJobQueue::DeclareJobFinished()
*/
bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
{
JobQueueJob *poJob = new JobQueueJob;
poJob->poQueue = this;
poJob->pfnFunc = pfnFunc;
poJob->pData = pData;
return SubmitJob([=] { pfnFunc(pData); });
}

/** Queue a new job.
*
* @param task Task to execute.
* @return true in case of success.
*/
bool CPLJobQueue::SubmitJob(std::function<void()> task)
{
{
std::lock_guard<std::mutex> oGuard(m_mutex);
m_nPendingJobs++;
}

// cppcheck-suppress knownConditionTrueFalse
return m_poPool->SubmitJob(JobQueueFunction, poJob);
return m_poPool->SubmitJob(
[this, task]
{
task();
DeclareJobFinished();
});
}

/************************************************************************/
Expand All @@ -651,11 +632,8 @@ bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
{
std::unique_lock<std::mutex> oGuard(m_mutex);
// coverity[missing_lock:FALSE]
while (m_nPendingJobs > nMaxRemainingJobs)
{
m_cv.wait(oGuard);
}
m_cv.wait(oGuard, [this, nMaxRemainingJobs]
{ return m_nPendingJobs <= nMaxRemainingJobs; });
}

/************************************************************************/
Expand All @@ -668,21 +646,15 @@ void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
*/
bool CPLJobQueue::WaitEvent()
{
// NOTE - This isn't quite right. After nPendingJobsBefore is set but before
// a notification occurs, jobs could be submitted which would increase
// nPendingJobs, so a job completion may looks like a spurious wakeup.
std::unique_lock<std::mutex> oGuard(m_mutex);
while (true)
{
// coverity[missing_lock:FALSE]
const int nPendingJobsBefore = m_nPendingJobs;
if (nPendingJobsBefore == 0)
{
return false;
}
m_cv.wait(oGuard);
// cppcheck-suppress knownConditionTrueFalse
// coverity[missing_lock:FALSE]
if (m_nPendingJobs < nPendingJobsBefore)
{
return m_nPendingJobs > 0;
}
}
if (m_nPendingJobs == 0)
return false;

const int nPendingJobsBefore = m_nPendingJobs;
m_cv.wait(oGuard, [this, nPendingJobsBefore]
{ return m_nPendingJobs < nPendingJobsBefore; });
return m_nPendingJobs > 0;
}
6 changes: 4 additions & 2 deletions port/cpl_worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ typedef enum
#endif // ndef DOXYGEN_SKIP

class CPLJobQueue;
/// Unique pointer to a job queue.
using CPLJobQueuePtr = std::unique_ptr<CPLJobQueue>;

/** Pool of worker threads */
class CPL_DLL CPLWorkerThreadPool
Expand Down Expand Up @@ -106,7 +108,7 @@ class CPL_DLL CPLWorkerThreadPool
bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData,
bool bWaitallStarted);

std::unique_ptr<CPLJobQueue> CreateJobQueue();
CPLJobQueuePtr CreateJobQueue();

bool SubmitJob(std::function<void()> task);
bool SubmitJob(CPLThreadFunc pfnFunc, void *pData);
Expand All @@ -127,7 +129,6 @@ class CPL_DLL CPLJobQueue
std::condition_variable m_cv{};
int m_nPendingJobs = 0;

static void JobQueueFunction(void *);
void DeclareJobFinished();

//! @cond Doxygen_Suppress
Expand All @@ -146,6 +147,7 @@ class CPL_DLL CPLJobQueue
}

bool SubmitJob(CPLThreadFunc pfnFunc, void *pData);
bool SubmitJob(std::function<void()> task);
void WaitCompletion(int nMaxRemainingJobs = 0);
bool WaitEvent();
};
Expand Down

0 comments on commit 0ebfa53

Please sign in to comment.