-
Notifications
You must be signed in to change notification settings - Fork 0
/
TaskController.cpp
133 lines (115 loc) · 3.08 KB
/
TaskController.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
* Copyright (C) 2021 Ilya Entin
*/
#include "TaskController.h"
#include "Logger.h"
#include "ServerOptions.h"
#include "Task.h"
TaskControllerPtr TaskController::_single;
TaskController::Phase TaskController::_phase = PREPROCESSTASK;
TaskController::TaskController() :
_barrier(ServerOptions::_numberWorkThreads, onTaskCompletion),
_threadPool(ServerOptions::_numberWorkThreads) {
// start with empty task
_task = std::make_shared<Task>();
}
TaskController::~TaskController() {
Trace << '\n';
}
TaskControllerWeakPtr TaskController::getWeakPtr() {
return _single;
}
// This method is called by one of the threads
// when the current barrier phase completes.
void TaskController::onTaskCompletion() noexcept {
auto ptr = _single;
if (ptr)
ptr->onCompletion();
}
void TaskController::onCompletion() {
switch (_phase) {
case PREPROCESSTASK:
if (ServerOptions::_sortInput)
_task->sortIndices();
_task->resetIndex();
_phase = PROCESSTASK;
break;
case PROCESSTASK:
_task->finish();
// Blocks until the new task is available.
setNextTask();
_task->resetIndex();
_phase = PREPROCESSTASK;
break;
default:
break;
}
}
bool TaskController::start() {
for (int i = 0; i < ServerOptions::_numberWorkThreads; ++i) {
auto worker = std::make_shared<Worker>(_single);
_threadPool.push(worker);
}
return true;
}
void TaskController::push(TaskPtr task) {
std::lock_guard lock(_queueMutex);
_queue.push(task);
_queueCondition.notify_one();
}
void TaskController::processTask(TaskPtr task) {
auto future = task->getPromise().get_future();
push(task);
future.get();
}
void TaskController::setNextTask() {
std::unique_lock lock(_queueMutex);
_queueCondition.wait(lock, [this] { return !_queue.empty() || _stopped; });
if (_stopped)
return;
_task = _queue.front();
_queue.pop();
}
bool TaskController::create() {
_single = std::make_shared<TaskController>();
return _single->start();
}
void TaskController::stop() {
// stop threads
{
std::lock_guard lock(_queueMutex);
_stopped.store(true);
_queueCondition.notify_one();
}
_threadPool.stop();
}
void TaskController::destroy() {
if (_single)
_single->stop();
// destroy controller
TaskControllerPtr().swap(_single);
}
TaskController::Worker::Worker(TaskControllerWeakPtr taskController) :
Runnable(ServerOptions::_numberWorkThreads),
_taskController(taskController) {}
// Process the current task (batch of requests) by all threads. Arrive
// at the sync point when the task is done and wait for the next one.
void TaskController::Worker::run() noexcept {
auto taskController = _taskController.lock();
if (!taskController)
return;
auto& stopped = taskController->_stopped;
auto& task = taskController->_task;
auto& barrier = taskController->_barrier;
while (!stopped) {
if (_phase == PROCESSTASK) {
while (task->processNext());
barrier.arrive_and_wait();
}
else if (_phase == PREPROCESSTASK) {
if (Task::_preprocessRequest)
while (task->preprocessNext());
barrier.arrive_and_wait();
}
}
}