From 76b9ce9b1f7d2b7e64b4b9c9d456a02b8a010473 Mon Sep 17 00:00:00 2001 From: Adam Straw Date: Mon, 13 Jun 2022 10:22:54 -0700 Subject: [PATCH] [Hexagon] Add HexagonThreadManager (#11653) * Adding initial threadmanager class * Fixed compile errors * Moving constant defines inside class * Updating qurt includes * use default scope for hexagon buffers * Updating buffer allocations * Fixed bug where array of pointers treated as array of structs * - Updated HexgonDeviceAPI to use HexagonThreadManager - Updated HexagonThreadManager interface to use TVMStreams - Added second `Dispatch` interfce in thread manager to use PackedFuncs - Updated thread manager to use vector for dynamic semaphore allocation - Added "#if defined(__hexagon__)" in several places to prevent compilation errors * Bug fixes + interface addition + basic thread tests - Fixed GetStreams not returning the streams properly - Added missing semaphore cleanup to prevent qurt kernel resource leakage - new interface functions: - Start() : now all worker threads are blocked on initialization until ThreadManager->Start() is called - WaitOnThreads() : blocking call which waits until all worker thread queues are empty - added extra debug logging - Two new basic thread tests working * Adding initial ThreadManager tests * HexagonThreadManager tests and refactor * remove stack / pipe size member vars * init pointers in the header file * move all mem allocs to SpawnThreads * start_semaphore as object instead of pointer * fix bug with WaitOnThreads deadlock + Wait/Signal off by one error * add / refactor Signal / Wait tests * add SyncFromTo test cases * add Dispatch test cases * add pipe fill and overflow cases * Updating dispatch to return bool and fix pipe overflow problem * change around min / max values for stack / pipe * integrate pipe fill / overflow tests back into HTM test suite * use HexagonBuffer * assert if stack / pipe sizes fall below min * Changed semaphore vector to store pointers, not structs (fixes vector capacity adjustment invaliding in-use addresses). * add producer consumer, thread order test cases * change to unordered_map for semaphores and remove PreallocateSyncs * tests running on device * code cleanup for compile warnings * remove #if defined(__hexagon__) guards * copyright, format, lint * add hexagon buffer map class * remove redundant thread manager tests * revert Hex Dev API changes for threading * add comments; remove untested code to dispatch / wrap a packed func * pass pipe address and not HTM pointer to thread context * rename to HexagonBufferManager * cleanup ahead of PR * use DLOG(INFO) * refactor GetStreamHandles to return a vector by value * adjust HexagonBufferManager methods; use thread_manager file names * style guidelines and debug prints * reinterpret cast for TVMStreamHandle * end member variables with underscore Co-authored-by: Joseph McMahan --- src/runtime/hexagon/hexagon_buffer_manager.h | 81 +++++ src/runtime/hexagon/hexagon_device_api.cc | 29 +- src/runtime/hexagon/hexagon_device_api.h | 23 +- src/runtime/hexagon/hexagon_thread_manager.cc | 291 ++++++++++++++++ src/runtime/hexagon/hexagon_thread_manager.h | 194 +++++++++++ .../hexagon/hexagon_thread_manager_tests.cc | 324 ++++++++++++++++++ 6 files changed, 901 insertions(+), 41 deletions(-) create mode 100644 src/runtime/hexagon/hexagon_buffer_manager.h create mode 100644 src/runtime/hexagon/hexagon_thread_manager.cc create mode 100644 src/runtime/hexagon/hexagon_thread_manager.h create mode 100644 tests/cpp-runtime/hexagon/hexagon_thread_manager_tests.cc diff --git a/src/runtime/hexagon/hexagon_buffer_manager.h b/src/runtime/hexagon/hexagon_buffer_manager.h new file mode 100644 index 000000000000..658a39fac8a8 --- /dev/null +++ b/src/runtime/hexagon/hexagon_buffer_manager.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef TVM_RUNTIME_HEXAGON_HEXAGON_BUFFER_MANAGER_H_ +#define TVM_RUNTIME_HEXAGON_HEXAGON_BUFFER_MANAGER_H_ + +#include + +#include +#include +#include + +#include "hexagon_buffer.h" + +namespace tvm { +namespace runtime { +namespace hexagon { + +class HexagonBufferManager { + public: + /*! + * \brief Free a HexagonBuffer. + * \param ptr Address of the HexagonBuffer as returned by `AllocateHexagonBuffer`. + */ + void FreeHexagonBuffer(void* ptr) { + auto it = hexagon_buffer_map_.find(ptr); + CHECK(it != hexagon_buffer_map_.end()) + << "Attempt made to free unknown or already freed dataspace allocation"; + CHECK(it->second != nullptr); + hexagon_buffer_map_.erase(it); + } + /*! + * \brief Allocate a HexagonBuffer. + * \param args Templated arguments to pass through to HexagonBuffer constructor. + */ + template + void* AllocateHexagonBuffer(Args&&... args) { + auto buf = std::make_unique(std::forward(args)...); + void* ptr = buf->GetPointer(); + hexagon_buffer_map_.insert({ptr, std::move(buf)}); + return ptr; + } + + //! \brief Returns whether the HexagonBuffer is in the map. + size_t count(void* ptr) { return hexagon_buffer_map_.count(ptr); } + + //! \brief Returns an iterator to the HexagonBuffer within the map. + HexagonBuffer* find(void* ptr) { + auto it = hexagon_buffer_map_.find(ptr); + if (it != hexagon_buffer_map_.end()) { + return it->second.get(); + } + return nullptr; + } + + private: + //! \brief Contains the HexagonBuffer objects managed by this class. + std::unordered_map> hexagon_buffer_map_; +}; + +} // namespace hexagon +} // namespace runtime +} // namespace tvm + +#endif // TVM_RUNTIME_HEXAGON_HEXAGON_BUFFER_MANAGER_H_ diff --git a/src/runtime/hexagon/hexagon_device_api.cc b/src/runtime/hexagon/hexagon_device_api.cc index c9c1586008e3..92a7b22784fb 100644 --- a/src/runtime/hexagon/hexagon_device_api.cc +++ b/src/runtime/hexagon/hexagon_device_api.cc @@ -32,7 +32,6 @@ #include #include "../workspace_pool.h" -#include "hexagon_buffer.h" #include "hexagon_common.h" namespace tvm { @@ -74,14 +73,14 @@ void* HexagonDeviceAPI::AllocDataSpace(Device dev, int ndim, const int64_t* shap } if (ndim == 0) { - return AllocateHexagonBuffer(typesize, alignment, mem_scope); + return hexbuffs.AllocateHexagonBuffer(typesize, alignment, mem_scope); } else if (ndim == 1) { size_t nbytes = shape[0] * typesize; - return AllocateHexagonBuffer(nbytes, alignment, mem_scope); + return hexbuffs.AllocateHexagonBuffer(nbytes, alignment, mem_scope); } else if (ndim == 2) { size_t nallocs = shape[0]; size_t nbytes = shape[1] * typesize; - return AllocateHexagonBuffer(nallocs, nbytes, alignment, mem_scope); + return hexbuffs.AllocateHexagonBuffer(nallocs, nbytes, alignment, mem_scope); } else { LOG(FATAL) << "Hexagon Device API supports only 1d and 2d allocations, but received ndim = " << ndim; @@ -97,13 +96,13 @@ void* HexagonDeviceAPI::AllocDataSpace(Device dev, size_t nbytes, size_t alignme if (alignment < kHexagonAllocAlignment) { alignment = kHexagonAllocAlignment; } - return AllocateHexagonBuffer(nbytes, alignment, String("global")); + return hexbuffs.AllocateHexagonBuffer(nbytes, alignment, String("global")); } void HexagonDeviceAPI::FreeDataSpace(Device dev, void* ptr) { CHECK(ptr) << "buffer pointer is null"; CHECK(IsValidDevice(dev)) << "dev.device_type: " << dev.device_type; - FreeHexagonBuffer(ptr); + hexbuffs.FreeHexagonBuffer(ptr); } // WorkSpace: runtime allocations for Hexagon @@ -119,7 +118,7 @@ void* HexagonDeviceAPI::AllocWorkspace(Device dev, size_t size, DLDataType type_ void HexagonDeviceAPI::FreeWorkspace(Device dev, void* data) { CHECK(IsValidDevice(dev)) << "dev.device_type: " << dev.device_type; - CHECK(hexagon_buffer_map_.count(data) != 0) + CHECK(hexbuffs.count(data) != 0) << "Attempt made to free unknown or already freed workspace allocation"; dmlc::ThreadLocalStore::Get()->FreeWorkspace(dev, data); } @@ -143,13 +142,7 @@ void HexagonDeviceAPI::CopyDataFromTo(DLTensor* from, DLTensor* to, TVMStreamHan CHECK_EQ(to->byte_offset, 0); CHECK_EQ(GetDataSize(*from), GetDataSize(*to)); - auto lookup_hexagon_buffer = [this](void* ptr) -> HexagonBuffer* { - auto it = this->hexagon_buffer_map_.find(ptr); - if (it != this->hexagon_buffer_map_.end()) { - return it->second.get(); - } - return nullptr; - }; + auto lookup_hexagon_buffer = [this](void* ptr) -> HexagonBuffer* { return hexbuffs.find(ptr); }; HexagonBuffer* hex_from_buf = lookup_hexagon_buffer(from->data); HexagonBuffer* hex_to_buf = lookup_hexagon_buffer(to->data); @@ -172,14 +165,6 @@ void HexagonDeviceAPI::CopyDataFromTo(const void* from, size_t from_offset, void memcpy(static_cast(to) + to_offset, static_cast(from) + from_offset, size); } -void HexagonDeviceAPI::FreeHexagonBuffer(void* ptr) { - auto it = hexagon_buffer_map_.find(ptr); - CHECK(it != hexagon_buffer_map_.end()) - << "Attempt made to free unknown or already freed dataspace allocation"; - CHECK(it->second != nullptr); - hexagon_buffer_map_.erase(it); -} - TVM_REGISTER_GLOBAL("device_api.hexagon.mem_copy").set_body([](TVMArgs args, TVMRetValue* rv) { void* dst = args[0]; void* src = args[1]; diff --git a/src/runtime/hexagon/hexagon_device_api.h b/src/runtime/hexagon/hexagon_device_api.h index 6f65bf402757..4da12e35fbe7 100644 --- a/src/runtime/hexagon/hexagon_device_api.h +++ b/src/runtime/hexagon/hexagon_device_api.h @@ -30,6 +30,7 @@ #include #include "hexagon_buffer.h" +#include "hexagon_buffer_manager.h" namespace tvm { namespace runtime { @@ -72,7 +73,7 @@ class HexagonDeviceAPI final : public DeviceAPI { */ void* AllocWorkspace(Device dev, size_t size, DLDataType type_hint) final; - //! Erase from tracked hexagon_buffer_map and free + //! Erase from HexagonBufferManager and free void FreeWorkspace(Device dev, void* data) final; /*! @@ -127,18 +128,6 @@ class HexagonDeviceAPI final : public DeviceAPI { TVMStreamHandle stream) final; private: - /*! \brief Helper to allocate a HexagonBuffer and register the result - * in the owned buffer map. - * \return Raw data storage managed by the hexagon buffer - */ - template - void* AllocateHexagonBuffer(Args&&... args) { - auto buf = std::make_unique(std::forward(args)...); - void* ptr = buf->GetPointer(); - hexagon_buffer_map_.insert({ptr, std::move(buf)}); - return ptr; - } - /*! \brief Helper to check if the device type is valid for the Hexagon Device API * \return Boolean indicating whether the device type is valid */ @@ -148,12 +137,8 @@ class HexagonDeviceAPI final : public DeviceAPI { (DLDeviceType(dev.device_type) == kDLCPU); } - /*! \brief Helper to free a HexagonBuffer and unregister the result - * from the owned buffer map. - */ - void FreeHexagonBuffer(void* ptr); - //! Lookup table for the HexagonBuffer managing an allocation. - std::unordered_map> hexagon_buffer_map_; + //! \brief Manages underlying HexagonBuffer allocations + HexagonBufferManager hexbuffs; }; } // namespace hexagon } // namespace runtime diff --git a/src/runtime/hexagon/hexagon_thread_manager.cc b/src/runtime/hexagon/hexagon_thread_manager.cc new file mode 100644 index 000000000000..5d67b142e575 --- /dev/null +++ b/src/runtime/hexagon/hexagon_thread_manager.cc @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "hexagon_thread_manager.h" + +namespace tvm { +namespace runtime { +namespace hexagon { + +HexagonThreadManager::HexagonThreadManager(unsigned num_threads, unsigned thread_stack_size_bytes, + unsigned thread_pipe_size_words) { + // Note: could technically manage more software threads than allowable hardware threads, but there + // is no system constant defined + // in the qurt libs for that maximum. + CHECK(num_threads); + CHECK_LE(num_threads, QURT_MAX_HTHREAD_LIMIT); + nthreads_ = num_threads; + + CHECK_GE(thread_stack_size_bytes, MIN_STACK_SIZE_BYTES); + CHECK_LE(thread_stack_size_bytes, MAX_STACK_SIZE_BYTES); + + CHECK_GE(thread_pipe_size_words, MIN_PIPE_SIZE_WORDS); + CHECK_LE(thread_pipe_size_words, MAX_PIPE_SIZE_WORDS); + + DLOG(INFO) << "Spawning threads"; + SpawnThreads(thread_stack_size_bytes, thread_pipe_size_words); + + // Initially, block all threads until we get the Start() call + qurt_sem_init_val(&start_semaphore_, 0); + for (unsigned i = 0; i < nthreads_; i++) { + Dispatch(reinterpret_cast(i), thread_wait, &start_semaphore_); + } +} + +HexagonThreadManager::~HexagonThreadManager() { + // In case Start() was never explicitly called, call it now to prevent deadlock + if (qurt_sem_get_val(&start_semaphore_) == 0) { + Start(); + } + + DLOG(INFO) << "Threads started"; + + // dispatch a command to each thread to exit with status 0 + for (unsigned i = 0; i < nthreads_; i++) { + bool success = Dispatch(reinterpret_cast(i), thread_exit, nullptr); + while (!success) { + success = Dispatch(reinterpret_cast(i), thread_exit, nullptr); + } + } + + DLOG(INFO) << "Threads exited"; + + // join with each thread (wait for them to terminate); if already exited, the call returns + // immediately + int status; // don't actually care what the thread exit status was + for (unsigned i = 0; i < nthreads_; i++) { + qurt_thread_join(threads_[i], &status); + } + + DLOG(INFO) << "Threads joined"; + + // Destroy semaphores + qurt_sem_destroy(&start_semaphore_); + for (auto it : semaphores_) { + qurt_sem_destroy(it.second); + free(it.second); + } + + DLOG(INFO) << "Semaphores destroyed"; + + // Delete pipe objects and contexts + for (unsigned i = 0; i < nthreads_; i++) { + qurt_pipe_destroy(&pipes_[i]); + delete contexts_[i]; + } + + DLOG(INFO) << "Pipes and contexts deleted"; + + // Dealloc memory blocks + hexbuffs_.FreeHexagonBuffer(stack_buffer_); + hexbuffs_.FreeHexagonBuffer(pipe_buffer_); + + DLOG(INFO) << "Buffers freed"; +} + +void HexagonThreadManager::SpawnThreads(unsigned thread_stack_size_bytes, + unsigned thread_pipe_size_words) { + // allocate all stack space for threads + stack_buffer_ = hexbuffs_.AllocateHexagonBuffer(thread_stack_size_bytes * nthreads_, + MEM_ALIGNMENT, String("global")); + // allocate space for pipe buffers (command queues) + unsigned thread_pipe_size_bytes = thread_pipe_size_words * sizeof(qurt_pipe_data_t); + pipe_buffer_ = hexbuffs_.AllocateHexagonBuffer(thread_pipe_size_bytes * nthreads_, MEM_ALIGNMENT, + String("global")); + + threads_.resize(nthreads_); + pipes_.resize(nthreads_); + contexts_.resize(nthreads_); + + DLOG(INFO) << "Buffers allocated"; + + // First, create pipe resources for all threads + char* next_pipe_start = reinterpret_cast(pipe_buffer_); + for (unsigned i = 0; i < nthreads_; i++) { + qurt_pipe_attr_t pipe_attr; + qurt_pipe_attr_init(&pipe_attr); + qurt_pipe_attr_set_buffer(&pipe_attr, reinterpret_cast(next_pipe_start)); + next_pipe_start += thread_pipe_size_bytes; + qurt_pipe_attr_set_buffer_partition(&pipe_attr, QURT_PIPE_ATTR_MEM_PARTITION_RAM); + qurt_pipe_attr_set_elements(&pipe_attr, thread_pipe_size_words); + + // create the pipe + int rc = qurt_pipe_init(&pipes_[i], &pipe_attr); + CHECK_EQ(rc, QURT_EOK); + } + + DLOG(INFO) << "Pipes created"; + + // Create all threads + char* next_stack_start = reinterpret_cast(stack_buffer_); + for (unsigned i = 0; i < nthreads_; i++) { + // create initialize the thread attr + qurt_thread_attr_t thread_attr; + char name[32]; + qurt_thread_attr_init(&thread_attr); + qurt_thread_attr_set_stack_addr(&thread_attr, next_stack_start); + qurt_thread_attr_set_stack_size(&thread_attr, thread_stack_size_bytes); + snprintf(name, sizeof(name), "thread %d", i); + qurt_thread_attr_set_name(&thread_attr, name); + next_stack_start += thread_stack_size_bytes; + + // create the thread + contexts_[i] = new ThreadContext(&pipes_[i], i); + int rc = qurt_thread_create(&threads_[i], &thread_attr, thread_main, contexts_[i]); + CHECK_EQ(rc, QURT_EOK); + } + + DLOG(INFO) << "Threads created"; +} + +const std::vector HexagonThreadManager::GetStreamHandles() { + std::vector out; + for (unsigned i = 0; i < nthreads_; i++) { + // threads identified by index into `threads` array + out.push_back(reinterpret_cast(i)); + } + return out; +} + +bool HexagonThreadManager::Dispatch(TVMStreamHandle stream, voidfunc f, void* args) { + unsigned thread = reinterpret_cast(stream); + DLOG(INFO) << "Dispatching to stream " << thread; + Command* cmd = new Command(f, args); // Command object freed by receiving thread + qurt_pipe_data_t msg = (qurt_pipe_data_t)(cmd); + qurt_pipe_t* pipeAddr = &pipes_[thread]; + + int trysend = qurt_pipe_try_send(pipeAddr, msg); + return trysend == 0; +} + +void HexagonThreadManager::Start() { thread_signal(&start_semaphore_); } + +void HexagonThreadManager::WaitOnThreads() { + // Using standard signal mechanism to block the "main" thread on all worker threads. + // Note: this would be slightly more efficient as a barrier, but would need some extra code to + // wait on the barrier that would only be used once. + + // In case Start() was never explicitly called, call it now to prevent deadlock + if (qurt_sem_get_val(&start_semaphore_) == 0) { + Start(); + } + + std::vector finished; + finished.resize(nthreads_); + + // initialize one semaphore for each thread + for (unsigned i = 0; i < nthreads_; i++) { + qurt_sem_init_val(&finished[i], 0); + } + // dispatch signal() command to each thread on their private semaphore + for (unsigned i = 0; i < nthreads_; i++) { + bool success = Dispatch(reinterpret_cast(i), thread_signal, &finished[i]); + while (!success) { + success = Dispatch(reinterpret_cast(i), thread_signal, &finished[i]); + } + } + // wait on each semaphore, one at a time + for (unsigned i = 0; i < nthreads_; i++) { + thread_wait(&finished[i]); + } + + // clean up + for (unsigned i = 0; i < nthreads_; i++) { + qurt_sem_destroy(&finished[i]); + } +} + +void HexagonThreadManager::CheckSemaphore(unsigned syncID) { + if (semaphores_.find(syncID) == semaphores_.end()) { + semaphores_[syncID] = reinterpret_cast(malloc(sizeof(qurt_sem_t))); + qurt_sem_init_val(semaphores_[syncID], 0); + } +} + +bool HexagonThreadManager::Signal(TVMStreamHandle thread, SyncPoint syncID) { + CheckSemaphore(syncID); + DLOG(INFO) << "Dispatching signal to thread " << thread << " on semaphore ID " << syncID + << " located @ 0x" << std::hex << semaphores_[syncID]; + return Dispatch(thread, thread_signal, semaphores_[syncID]); +} + +bool HexagonThreadManager::Wait(TVMStreamHandle thread, SyncPoint syncID) { + CheckSemaphore(syncID); + DLOG(INFO) << "Dispatching wait to thread " << thread << " on semaphore ID " << syncID + << " located @ 0x" << std::hex << semaphores_[syncID]; + return Dispatch(thread, thread_wait, semaphores_[syncID]); +} + +/* Create a sync_from_to relationship with a dynamic semaphore allocation. +Makes use of thread_wait_free to also free the semaphore after sync is complete. +*/ +bool HexagonThreadManager::SyncFromTo(TVMStreamHandle signal_thread, TVMStreamHandle wait_thread) { + qurt_sem_t* sem = reinterpret_cast(malloc(sizeof(qurt_sem_t))); + qurt_sem_init_val(sem, 0); + if (Dispatch(signal_thread, thread_signal, sem)) { + return Dispatch(wait_thread, thread_wait_free, sem); + } else { + return false; + } +} + +void HexagonThreadManager::thread_signal(void* semaphore) { + DLOG(INFO) << "Signaling semaphore addr 0x" << std::hex << semaphore; + qurt_sem_add(reinterpret_cast(semaphore), QURT_MAX_HTHREAD_LIMIT); +} + +void HexagonThreadManager::thread_wait(void* semaphore) { + DLOG(INFO) << "Waiting on semaphore addr 0x" << std::hex << semaphore; + qurt_sem_down(reinterpret_cast(semaphore)); +} + +/* Wait on the passed semaphore object, then free it. */ +void HexagonThreadManager::thread_wait_free(void* semaphore) { + qurt_sem_down(reinterpret_cast(semaphore)); // blocks until signal is complete + qurt_sem_destroy(reinterpret_cast(semaphore)); + free(semaphore); +} + +void HexagonThreadManager::thread_exit(void* status) { + DLOG(INFO) << "thread exiting"; + qurt_thread_exit((uint64_t)status); +} + +void HexagonThreadManager::thread_main(void* context) { + ThreadContext* tc = static_cast(context); + unsigned index = tc->index; + qurt_pipe_t* mypipe = tc->pipe; + + DLOG(INFO) << "Thread " << index << " spawned"; + + while (true) { // loop, executing commands from pipe + DLOG(INFO) << "Thread " << index << " receiving command"; + qurt_pipe_data_t msg = qurt_pipe_receive(mypipe); // blocks if empty + Command* cmd = reinterpret_cast(msg); + voidfunc f = cmd->f; + void* args = cmd->args; + delete cmd; + f(args); + } + // thread exit is handled by dispatching an exit command +} + +} // namespace hexagon +} // namespace runtime +} // namespace tvm diff --git a/src/runtime/hexagon/hexagon_thread_manager.h b/src/runtime/hexagon/hexagon_thread_manager.h new file mode 100644 index 000000000000..3422fef3879e --- /dev/null +++ b/src/runtime/hexagon/hexagon_thread_manager.h @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef TVM_RUNTIME_HEXAGON_HEXAGON_THREAD_MANAGER_H_ +#define TVM_RUNTIME_HEXAGON_HEXAGON_THREAD_MANAGER_H_ + +#include +#include +#include + +#include +#include +#include +#include + +#include "hexagon_buffer.h" +#include "hexagon_buffer_manager.h" +#include "hexagon_common.h" +#include "qurt.h" + +namespace tvm { +namespace runtime { +namespace hexagon { + +class HexagonThreadManager { + //! \brief Void function. + using voidfunc = void (*)(void*); + //! \brief Semaphore ID. + using SyncPoint = unsigned; + //! \brief Alignment of underlying memory allocations. + const unsigned MEM_ALIGNMENT = 32; + //! \brief Minimum stack size in bytes per thread. + const unsigned MIN_STACK_SIZE_BYTES = 0x400; // 1KB + //! \brief Maximum stack size in bytes per thread. + const unsigned MAX_STACK_SIZE_BYTES = 0x10000; // 64KB + //! \brief Minimum pipe (or command buffer) size in words (or commands) per thread. + const unsigned MIN_PIPE_SIZE_WORDS = 10; + //! \brief Maximum pipe (or command buffer) size in words (or commands) per thread. + const unsigned MAX_PIPE_SIZE_WORDS = 0x10000; // 64K words + + public: + /*! + * \brief Spawn a number of Hexagon threads with a given stack (in bytes) and pipe (a.k.a. command + * buffer; in words or commands) within the min and max values specified above. + * \param num_threads Number of threads to spawn. + * \param thread_stack_size_bytes Stack size in bytes per thread. + * \param thread_pipe_size_words Pipe (or command buffer) size in words (or commands). + */ + HexagonThreadManager(unsigned, unsigned thread_stack_size_bytes, unsigned thread_pipe_size_words); + + //! \brief Destructor + ~HexagonThreadManager(); + + /*! + * \brief Get the spawned threads as stream handles. + * \returns Vector of stream handles. + */ + const std::vector GetStreamHandles(); + + /*! + * \brief Non-blocking dispatch of a void function and args on a given thread. + * \param thread Stream handle of the thread on which to dispatch the void function. + * \param f Void function to be dispatched. + * \param args Arguments to pass to the void function. + * \returns Boolean value indicating success or failure of the dispatch; user must either 1) + * `Start` threads executing to clear space in the pipe before retrying dispatch or 2) create a + * `HexagonThreadManager` with a larger pipe. + */ + bool Dispatch(TVMStreamHandle thread, voidfunc f, void* args); + /*! + * \brief Non-blocking signal of a semaphore with a given ID. + * \param thread Stream handle of the thread which will signal the semaphore. + * \param syncID ID of the semaphore to be signaled. + * \returns Boolean value indicating success or failure of the dispatch of the signal; user must + * either 1) `Start` threads executing to clear space in the pipe before retrying dispatch or 2) + * create a `HexagonThreadManager` with a larger pipe. + */ + bool Signal(TVMStreamHandle thread, SyncPoint syncID); + /*! + * \brief Non-blocking wait on a semaphore with a given ID. + * \param thread Stream handle of the thread which will wait on the semaphore. + * \param syncID ID of the semaphore on which to wait. + * \returns Boolean value indicating success or failure of the dispatch of the wait; user must + * either 1) `Start` threads executing to clear space in the pipe before retrying dispatch or 2) + * create a `HexagonThreadManager` with a larger pipe. + */ + bool Wait(TVMStreamHandle thread, SyncPoint syncID); + /*! + * \brief Creates a synchronization point between two threads by creating a semaphore, + *dispatching the `signal_thread` to signal that semaphore and dispatching the `wait_thread to + *wait on that semaphore. + * \param signal_thread Stream handle for the thread which will signal the + *semaphore. + * \param wait_thread Stream handle for the thread which will wait on the semaphore. + * \returns Boolean value indicating success or failure of the combined dispatch of both the + *signal and the wait; user must either 1) `Start` threads executing to clear space in the pipe + *before retrying dispatch or 2) create a `HexagonThreadManager` with a larger pipe. + */ + bool SyncFromTo(TVMStreamHandle signal_thread, TVMStreamHandle wait_thread); + //! \brief Unblock threads to start execution. + void Start(); + //! \brief Unblock threads to start execution if `Start` has not already been called; blocking + //! call to wait until all threads have empty pipes. + void WaitOnThreads(); + + private: + struct ThreadContext { + qurt_pipe_t* pipe; + unsigned index; + ThreadContext(qurt_pipe_t* pipe, unsigned index) : pipe(pipe), index(index) {} + }; + + //! \brief Helper function for the constructor to spawn threads. + void SpawnThreads(unsigned thread_stack_size_bytes, unsigned thread_pipe_size_words); + + //! \brief Helper function for `Signal` and `Wait` to create, initialize and map semaphores by ID. + void CheckSemaphore(unsigned syncID); + + //! \brief Void function executed by a thread to signal a semaphore. + static void thread_signal(void* semaphore); + + //! \brief Void function executed by a thread to wait on a semaphore; used by `Wait`. + static void thread_wait(void* semaphore); + + //! \brief Void function executed by a thread to wait on and free a semaphore; used by + //! `SyncFromTo`. + static void thread_wait_free(void* semaphore); + + //! \brief Void function executed by a thread to exit at time of destruction. + static void thread_exit(void* status); + + //! \brief Void function executed by each thread as `main`. + static void thread_main(void* context); + + //! \brief Manages underlying HexagonBuffer allocations. + HexagonBufferManager hexbuffs_; + + //! \brief Number of threads allocatted. + unsigned nthreads_{0}; + + //! \brief Pointer to the base of the stacks allocated for all threads; size = `nthreads` * + //! `thread_stack_size_bytes`. + void* stack_buffer_{nullptr}; + + //! \brief Pointer to the base of the pipes (or command buffers) allocated for all threads; size = + //! `nthreads` * `thread_pipe_size_words` * sizeof(word). + void* pipe_buffer_{nullptr}; + + //! \brief QURT thread structure for each spawned thread. + std::vector threads_; + + //! \brief QURT pipe (or command buffer) structure for each spawned thread. + std::vector pipes_; + + //! \brief Thread context passed into each `thread_main` function. + std::vector contexts_; + + //! \brief Semaphores used by `Signal` and `Wait` mapped by ID. + std::unordered_map semaphores_; + + //! \brief Start semaphore created at time of construction; signled by `Start`. + qurt_sem_t start_semaphore_; + + /*! + *\brief Encapsulate a void function pointer + arg pointer; sent via pipe to threads to execute. + */ + struct Command { + voidfunc f; + void* args; + Command(voidfunc f, void* args) : f(f), args(args) {} + }; +}; + +} // namespace hexagon +} // namespace runtime +} // namespace tvm + +#endif // TVM_RUNTIME_HEXAGON_HEXAGON_THREAD_MANAGER_H_ diff --git a/tests/cpp-runtime/hexagon/hexagon_thread_manager_tests.cc b/tests/cpp-runtime/hexagon/hexagon_thread_manager_tests.cc new file mode 100644 index 000000000000..aa86e4638df3 --- /dev/null +++ b/tests/cpp-runtime/hexagon/hexagon_thread_manager_tests.cc @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include "../src/runtime/hexagon/hexagon_thread_manager.h" + +using namespace tvm::runtime; +using namespace tvm::runtime::hexagon; + +class HexagonThreadManagerTest : public ::testing::Test { + protected: + void SetUp() override { + htm = new HexagonThreadManager(threads, stack_size, pipe_size); + streams = htm->GetStreamHandles(); + } + void TearDown() override { delete htm; } + HexagonThreadManager* htm{nullptr}; + std::vector streams; + int answer{0}; + const unsigned threads{6}; + const unsigned pipe_size{100}; + const unsigned stack_size{0x4000}; // 16KB +}; + +TEST_F(HexagonThreadManagerTest, ctor_errors) { + // zero threads + ASSERT_THROW(HexagonThreadManager(0, stack_size, pipe_size), InternalError); + // too many threads + ASSERT_THROW(HexagonThreadManager(0x10000000, stack_size, pipe_size), InternalError); + // stack too small + ASSERT_THROW(HexagonThreadManager(6, 0, pipe_size), InternalError); + // stack too big + ASSERT_THROW(HexagonThreadManager(6, 0x10000000, pipe_size), InternalError); + // pipe too small + ASSERT_THROW(HexagonThreadManager(6, stack_size, 9), InternalError); + // pipe too big + ASSERT_THROW(HexagonThreadManager(6, stack_size, 0x10000000), InternalError); +} + +TEST_F(HexagonThreadManagerTest, init) { + CHECK(htm != nullptr); + CHECK_EQ(streams.size(), threads); +} + +void get_the_answer(void* answer) { *reinterpret_cast(answer) = 42; } + +TEST_F(HexagonThreadManagerTest, dispatch) { + htm->Dispatch(streams[0], get_the_answer, &answer); + htm->Start(); + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, dispatch_wait) { + htm->Dispatch(streams[0], get_the_answer, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, wait_signal) { + htm->Wait(streams[0], 0); + htm->Signal(streams[1], 0); + htm->Dispatch(streams[0], get_the_answer, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, re_signal) { + htm->Wait(streams[0], 0); + htm->Signal(streams[1], 0); + htm->Signal(streams[1], 0); + htm->Dispatch(streams[0], get_the_answer, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, re_wait) { + htm->Wait(streams[0], 0); + htm->Signal(streams[1], 0); + htm->Wait(streams[0], 0); + htm->Dispatch(streams[0], get_the_answer, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, wait_signal_x2) { + htm->Wait(streams[0], 0); + htm->Signal(streams[1], 0); + htm->Wait(streams[0], 1); + htm->Signal(streams[1], 1); + htm->Dispatch(streams[0], get_the_answer, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, signal_wait) { + htm->Signal(streams[1], 0); + htm->Wait(streams[0], 0); + htm->Dispatch(streams[0], get_the_answer, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, sync_from_to) { + htm->SyncFromTo(streams[1], streams[0]); + htm->Dispatch(streams[0], get_the_answer, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, sync_from_to_self) { + htm->SyncFromTo(streams[0], streams[0]); + htm->Dispatch(streams[0], get_the_answer, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, sync_from_to_x2) { + htm->SyncFromTo(streams[0], streams[1]); + htm->SyncFromTo(streams[1], streams[0]); + htm->Dispatch(streams[0], get_the_answer, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, sync_from_to_all) { + htm->SyncFromTo(streams[5], streams[4]); + htm->SyncFromTo(streams[4], streams[3]); + htm->SyncFromTo(streams[3], streams[2]); + htm->SyncFromTo(streams[2], streams[1]); + htm->SyncFromTo(streams[1], streams[0]); + htm->Dispatch(streams[0], get_the_answer, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, pipe_fill) { + // fill the pipe + for (int i = 0; i < pipe_size; ++i) { + htm->Dispatch(streams[0], get_the_answer, &answer); + } + htm->WaitOnThreads(); + CHECK_EQ(answer, 42); +} + +TEST_F(HexagonThreadManagerTest, pipe_overflow) { + // fill the pipe + for (int i = 0; i < pipe_size; ++i) { + htm->Dispatch(streams[0], get_the_answer, &answer); + } + // overflow the pipe + bool space = htm->Dispatch(streams[0], get_the_answer, &answer); + CHECK_EQ(space, false); +} + +void increment(void* voidptr) { + int* intptr = reinterpret_cast(voidptr); + *intptr = *intptr + 1; +} + +TEST_F(HexagonThreadManagerTest, producer_consumer) { + htm->Dispatch(streams[5], increment, &answer); + htm->SyncFromTo(streams[5], streams[4]); + htm->Dispatch(streams[4], increment, &answer); + htm->SyncFromTo(streams[4], streams[3]); + htm->Dispatch(streams[3], increment, &answer); + htm->SyncFromTo(streams[3], streams[2]); + htm->Dispatch(streams[2], increment, &answer); + htm->SyncFromTo(streams[2], streams[1]); + htm->Dispatch(streams[1], increment, &answer); + htm->SyncFromTo(streams[1], streams[0]); + htm->Dispatch(streams[0], increment, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 6); +} + +TEST_F(HexagonThreadManagerTest, producer_consumer_signal_wait) { + htm->Wait(streams[0], 0); + htm->Wait(streams[1], 1); + htm->Wait(streams[2], 2); + htm->Wait(streams[3], 3); + htm->Wait(streams[4], 4); + + htm->Dispatch(streams[5], increment, &answer); + htm->Signal(streams[5], 4); + htm->Dispatch(streams[4], increment, &answer); + htm->Signal(streams[4], 3); + htm->Dispatch(streams[3], increment, &answer); + htm->Signal(streams[3], 2); + htm->Dispatch(streams[2], increment, &answer); + htm->Signal(streams[2], 1); + htm->Dispatch(streams[1], increment, &answer); + htm->Signal(streams[1], 0); + htm->Dispatch(streams[0], increment, &answer); + htm->WaitOnThreads(); + CHECK_EQ(answer, 6); +} + +struct ToAppend { + std::vector* arr; + int value; + ToAppend(std::vector* addr, int value) : arr(addr), value(value){}; +}; + +void append(void* toappend) { + ToAppend* cmd = reinterpret_cast(toappend); + cmd->arr->push_back(cmd->value); +} + +TEST_F(HexagonThreadManagerTest, thread_order) { + std::vector arr; + + ToAppend cmd0(&arr, 0); + htm->Dispatch(streams[0], append, &cmd0); + htm->SyncFromTo(streams[0], streams[1]); + + ToAppend cmd1(&arr, 1); + htm->Dispatch(streams[1], append, &cmd1); + htm->SyncFromTo(streams[1], streams[2]); + + ToAppend cmd2(&arr, 2); + htm->Dispatch(streams[2], append, &cmd2); + htm->SyncFromTo(streams[2], streams[3]); + + ToAppend cmd3(&arr, 3); + htm->Dispatch(streams[3], append, &cmd3); + htm->SyncFromTo(streams[3], streams[4]); + + ToAppend cmd4(&arr, 4); + htm->Dispatch(streams[4], append, &cmd4); + htm->SyncFromTo(streams[4], streams[5]); + + ToAppend cmd5(&arr, 5); + htm->Dispatch(streams[5], append, &cmd5); + htm->WaitOnThreads(); + for (int i = 0; i < threads; ++i) { + CHECK_EQ(arr[i], i); + } +} + +TEST_F(HexagonThreadManagerTest, thread_order_signal_wait) { + std::vector arr; + + htm->Wait(streams[1], 1); + htm->Wait(streams[2], 2); + htm->Wait(streams[3], 3); + htm->Wait(streams[4], 4); + htm->Wait(streams[5], 5); + + ToAppend cmd0(&arr, 0); + htm->Dispatch(streams[0], append, &cmd0); + htm->Signal(streams[0], 1); + + ToAppend cmd1(&arr, 1); + htm->Dispatch(streams[1], append, &cmd1); + htm->Signal(streams[1], 2); + + ToAppend cmd2(&arr, 2); + htm->Dispatch(streams[2], append, &cmd2); + htm->Signal(streams[2], 3); + + ToAppend cmd3(&arr, 3); + htm->Dispatch(streams[3], append, &cmd3); + htm->Signal(streams[3], 4); + + ToAppend cmd4(&arr, 4); + htm->Dispatch(streams[4], append, &cmd4); + htm->Signal(streams[4], 5); + + ToAppend cmd5(&arr, 5); + htm->Dispatch(streams[5], append, &cmd5); + htm->WaitOnThreads(); + for (int i = 0; i < threads; ++i) { + CHECK_EQ(arr[i], i); + } +} + +struct ToWrite { + int* addr; + int value; + ToWrite(int* addr, int value) : addr(addr), value(value){}; +}; + +void thread_write_val(void* towrite) { + ToWrite* cmd = reinterpret_cast(towrite); + *(cmd->addr) = cmd->value; + delete cmd; +} + +TEST_F(HexagonThreadManagerTest, dispatch_writes) { + std::vector array; + std::vector truth; + array.resize(streams.size()); + truth.resize(streams.size()); + for (int i = 0; i < streams.size(); i++) { + int val = i * 2; + ToWrite* cmd = new ToWrite(&array[i], val); + htm->Dispatch(streams[i], thread_write_val, cmd); + truth[i] = val; + } + htm->Start(); + htm->WaitOnThreads(); + for (int i = 0; i < streams.size(); i++) { + CHECK_EQ(array[i], truth[i]); + } +}