From 93f290d57c804f3a672b7b45468aae64a0f0504f Mon Sep 17 00:00:00 2001 From: Christoffer Lerno Date: Sat, 28 Sep 2024 01:25:00 +0200 Subject: [PATCH] Added a simple fixed threadpool which allocates. --- lib/std/threads/fixed_pool.c3 | 172 ++++++++++++++++++++++++++++++++++ lib/std/threads/pool.c3 | 2 +- 2 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 lib/std/threads/fixed_pool.c3 diff --git a/lib/std/threads/fixed_pool.c3 b/lib/std/threads/fixed_pool.c3 new file mode 100644 index 000000000..c42eb4f5b --- /dev/null +++ b/lib/std/threads/fixed_pool.c3 @@ -0,0 +1,172 @@ +module std::thread::threadpool; +import std::thread; + +fault ThreadPoolResult +{ + QUEUE_FULL +} + +def ThreadPoolFn = fn void(any[] args); + +struct FixedThreadPool @adhoc +{ + Mutex mu; + QueueItem[] queue; + usz qindex; + usz num_threads; + bitstruct : char { + bool initialized; + bool stop; + bool stop_now; + } + Thread[] pool; + ConditionVariable notify; +} + +struct QueueItem @private +{ + ThreadPoolFn func; + any[] args; +} + +/** + * @require !self.initialized "ThreadPool must not be already initialized" + * @require threads > 0 && threads < 0x1000 `Threads should be greater than 0 and less than 0x1000` + * @require queue_size < 0x10000 `Queue size must be less than 65536` + **/ +fn void! FixedThreadPool.init(&self, usz threads, usz queue_size = 0) +{ + if (queue_size == 0) queue_size = threads * 32; + defer catch @ok(self.destroy()); + assert(queue_size > 0); + *self = { + .num_threads = threads, + .initialized = true, + .queue = mem::alloc_array(QueueItem, queue_size), + .pool = mem::new_array(Thread, threads) + }; + self.mu.init()!; + self.notify.init()!; + foreach (&thread : self.pool) + { + thread.create(&process_work, self)!; + // The thread resources will be cleaned up when the thread exits. + thread.detach()!; + } +} + +/* + * Stop all the threads and cleanup the pool. + * Any pending work will be dropped. + */ +fn void! FixedThreadPool.destroy(&self) +{ + return self.@shutdown(stop_now); +} + +/* + * Stop all the threads and cleanup the pool. + * Any pending work will be processed. + */ +fn void! FixedThreadPool.stop_and_destroy(&self) +{ + return self.@shutdown(stop); +} + +macro void! FixedThreadPool.@shutdown(&self, #stop) @private +{ + if (self.initialized) + { + self.mu.lock()!; + self.#stop = true; + self.notify.broadcast()!; + self.mu.unlock()!; + // Wait for all threads to shutdown. + while (true) + { + self.mu.lock()!; + defer self.mu.unlock()!!; + if (self.num_threads == 0) + { + break; + } + self.notify.signal()!; + } + self.mu.destroy()!; + self.initialized = false; + while (self.qindex) + { + free_qitem(self.queue[--self.qindex]); + } + free(self.queue); + self.queue = {}; + } +} + +/* + * Push a new job to the pool. + * Returns whether the queue is full, in which case the job is ignored. + */ +fn void! FixedThreadPool.push(&self, ThreadPoolFn func, args...) +{ + self.mu.lock()!; + defer self.mu.unlock()!!; + if (self.qindex == self.queue.len) return ThreadPoolResult.QUEUE_FULL?; + any[] data; + if (args.len) + { + data = mem::alloc_array(any, args.len); + foreach (i, arg : args) data[i] = allocator::clone_any(allocator::heap(), arg); + } + self.queue[self.qindex] = { .func = func, .args = data }; + self.qindex++; + // Notify the threads that work is available. + self.notify.broadcast()!; +} + +fn int process_work(void* self_arg) @private +{ + FixedThreadPool* self = self_arg; + while (true) + { + self.mu.lock()!!; + if (self.stop_now) + { + // Shutdown requested. + self.num_threads--; + self.mu.unlock()!!; + return 0; + } + // Wait for work. + while (self.qindex == 0) + { + if (self.stop) + { + // Shutdown requested. + self.num_threads--; + self.mu.unlock()!!; + return 0; + } + self.notify.wait(&self.mu)!!; + if (self.stop_now) + { + // Shutdown requested. + self.num_threads--; + self.mu.unlock()!!; + return 0; + } + } + // Process the job. + self.qindex--; + QueueItem item = self.queue[self.qindex]; + self.mu.unlock()!!; + defer free_qitem(item); + item.func(item.args); + } +} + +fn void free_qitem(QueueItem item) @private +{ + foreach (arg : item.args) free(arg.ptr); + free(item.args); +} diff --git a/lib/std/threads/pool.c3 b/lib/std/threads/pool.c3 index 17b8b20cd..c92f0077b 100644 --- a/lib/std/threads/pool.c3 +++ b/lib/std/threads/pool.c3 @@ -17,7 +17,7 @@ struct ThreadPool ConditionVariable notify; } -struct QueueItem +struct QueueItem @private { ThreadFn func; void* arg;