Skip to content

Commit

Permalink
Backport deallocate slots on worker threads (#362)
Browse files Browse the repository at this point in the history
* Backport deallocate slots on worker threads

* Add changelog

* Up-port another change from the 1.27 patch

* Comment upstream source of the patch

* Whoops. commented the wrong line
  • Loading branch information
ashishb-solo committed Aug 14, 2024
1 parent bd43246 commit 466acb5
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 1 deletion.
296 changes: 296 additions & 0 deletions bazel/foreign_cc/0003-deallocate-slots-on-worker-threads.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc
index 260a08820c..292837a373 100644
--- source/common/runtime/runtime_features.cc
+++ source/common/runtime/runtime_features.cc
@@ -106,6 +106,7 @@ RUNTIME_GUARD(envoy_reloadable_features_validate_connect);
RUNTIME_GUARD(envoy_reloadable_features_validate_grpc_header_before_log_grpc_status);
RUNTIME_GUARD(envoy_reloadable_features_validate_upstream_headers);
RUNTIME_GUARD(envoy_restart_features_allow_client_socket_creation_failure);
+RUNTIME_GUARD(envoy_restart_features_allow_slot_destroy_on_worker_threads);
RUNTIME_GUARD(envoy_restart_features_quic_handle_certs_with_shared_tls_code);
RUNTIME_GUARD(envoy_restart_features_send_goaway_for_premature_rst_streams);
RUNTIME_GUARD(envoy_restart_features_udp_read_normalize_addresses);
diff --git a/source/common/thread_local/BUILD b/source/common/thread_local/BUILD
index 3e1697e397..726cd13c29 100644
--- source/common/thread_local/BUILD
+++ source/common/thread_local/BUILD
@@ -18,5 +18,6 @@ envoy_cc_library(
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/common:stl_helpers",
+ "//source/common/runtime:runtime_features_lib",
],
)
diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc
index 88981133cf..bd417164b1 100644
--- source/common/thread_local/thread_local_impl.cc
+++ source/common/thread_local/thread_local_impl.cc
@@ -9,12 +9,18 @@

#include "source/common/common/assert.h"
#include "source/common/common/stl_helpers.h"
+#include "source/common/runtime/runtime_features.h"

namespace Envoy {
namespace ThreadLocal {

thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_;

+InstanceImpl::InstanceImpl() {
+ allow_slot_destroy_on_worker_threads_ =
+ Runtime::runtimeFeatureEnabled("envoy.restart_features.allow_slot_destroy_on_worker_threads");
+}
+
InstanceImpl::~InstanceImpl() {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(shutdown_);
@@ -41,6 +47,41 @@ SlotPtr InstanceImpl::allocateSlot() {
InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index)
: parent_(parent), index_(index), still_alive_guard_(std::make_shared<bool>(true)) {}

+InstanceImpl::SlotImpl::~SlotImpl() {
+ // If the runtime feature is disabled then keep the original behavior. This should
+ // be cleaned up when the runtime feature
+ // "envoy.restart_features.allow_slot_destroy_on_worker_threads" is deprecated.
+ if (!parent_.allow_slot_destroy_on_worker_threads_) {
+ parent_.removeSlot(index_);
+ return;
+ }
+
+ // Do nothing if the parent is already shutdown. Return early here to avoid accessing the main
+ // thread dispatcher because it may have been destroyed.
+ if (isShutdown()) {
+ return;
+ }
+
+ auto* main_thread_dispatcher = parent_.main_thread_dispatcher_;
+ // Main thread dispatcher may be nullptr if the slot is being created and destroyed during
+ // server initialization.
+ if (!parent_.allow_slot_destroy_on_worker_threads_ || main_thread_dispatcher == nullptr ||
+ main_thread_dispatcher->isThreadSafe()) {
+ // If the slot is being destroyed on the main thread, we can remove it immediately.
+ parent_.removeSlot(index_);
+ } else {
+ // If the slot is being destroyed on a worker thread, we need to post the removal to the
+ // main thread. There are two possible cases here:
+ // 1. The removal is executed on the main thread as expected if the main dispatcher is still
+ // active. This is the common case and the clean up will be done as expected because the
+ // the worker dispatchers must be active before the main dispatcher is exited.
+ // 2. The removal is not executed if the main dispatcher has already exited. This is fine
+ // because the removal has no side effect and will be ignored. The shutdown process will
+ // clean up all the slots anyway.
+ main_thread_dispatcher->post([i = index_, &tls = parent_] { tls.removeSlot(i); });
+ }
+}
+
std::function<void()> InstanceImpl::SlotImpl::wrapCallback(const std::function<void()>& cb) {
// See the header file comments for still_alive_guard_ for the purpose of this capture and the
// expired check below.
diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h
index 25eebe4621..90753101b6 100644
--- source/common/thread_local/thread_local_impl.h
+++ source/common/thread_local/thread_local_impl.h
@@ -19,6 +19,7 @@ namespace ThreadLocal {
*/
class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, public Instance {
public:
+ InstanceImpl();
~InstanceImpl() override;

// ThreadLocal::Instance
@@ -35,7 +36,7 @@ private:
// slot as callbacks drain from workers.
struct SlotImpl : public Slot {
SlotImpl(InstanceImpl& parent, uint32_t index);
- ~SlotImpl() override { parent_.removeSlot(index_); }
+ ~SlotImpl() override;
std::function<void()> wrapCallback(const std::function<void()>& cb);
std::function<void()> dataCallback(const UpdateCb& cb);
static bool currentThreadRegisteredWorker(uint32_t index);
@@ -86,6 +87,8 @@ private:
Event::Dispatcher* main_thread_dispatcher_{};
std::atomic<bool> shutdown_{};

+ bool allow_slot_destroy_on_worker_threads_{};
+
// Test only.
friend class ThreadLocalInstanceImplTest;
};
diff --git a/source/server/server.cc b/source/server/server.cc
index 254cbc0860..ac473a50ab 100644
--- source/server/server.cc
+++ source/server/server.cc
@@ -432,6 +432,12 @@ absl::Status InstanceBase::initializeOrThrow(Network::Address::InstanceConstShar
ENVOY_LOG(info, " {}: {}", ext.first, absl::StrJoin(ext.second->registeredNames(), ", "));
}

+ // The main thread is also registered for thread local updates so that code that does not care
+ // whether it runs on the main thread or on workers can still use TLS.
+ // We do this as early as possible because this has no side effect and could ensure that the
+ // TLS always contains a valid main thread dispatcher when TLS is used.
+ thread_local_.registerThread(*dispatcher_, true);
+
// Handle configuration that needs to take place prior to the main configuration load.
RETURN_IF_NOT_OK(InstanceUtil::loadBootstrapConfig(
bootstrap_, options_, messageValidationContext().staticValidationVisitor(), *api_));
@@ -666,10 +672,6 @@ absl::Status InstanceBase::initializeOrThrow(Network::Address::InstanceConstShar
listener_manager_ = listener_manager_factory->createListenerManager(
*this, nullptr, worker_factory_, bootstrap_.enable_dispatcher_stats(), quic_stat_names_);

- // The main thread is also registered for thread local updates so that code that does not care
- // whether it runs on the main thread or on workers can still use TLS.
- thread_local_.registerThread(*dispatcher_, true);
-
// We can now initialize stats for threading.
stats_store_.initializeThreading(*dispatcher_, thread_local_);

diff --git a/test/common/thread_local/BUILD b/test/common/thread_local/BUILD
index fdd28bc076..725568bc51 100644
--- test/common/thread_local/BUILD
+++ test/common/thread_local/BUILD
@@ -17,5 +17,6 @@ envoy_cc_test(
"//source/common/stats:isolated_store_lib",
"//source/common/thread_local:thread_local_lib",
"//test/mocks/event:event_mocks",
+ "//test/test_common:test_runtime_lib",
],
)
diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc
index 9b5d100160..4ae9409fa4 100644
--- test/common/thread_local/thread_local_impl_test.cc
+++ test/common/thread_local/thread_local_impl_test.cc
@@ -4,12 +4,15 @@
#include "source/common/thread_local/thread_local_impl.h"

#include "test/mocks/event/mocks.h"
+#include "test/test_common/test_runtime.h"

#include "gmock/gmock.h"

using testing::_;
using testing::InSequence;
+using testing::NiceMock;
using testing::Ref;
+using testing::Return;
using testing::ReturnPointee;

namespace Envoy {
@@ -52,10 +55,14 @@ public:
class ThreadLocalInstanceImplTest : public testing::Test {
public:
ThreadLocalInstanceImplTest() {
- tls_.registerThread(main_dispatcher_, true);
- EXPECT_EQ(&main_dispatcher_, &tls_.dispatcher());
+ EXPECT_CALL(main_dispatcher_, isThreadSafe()).WillRepeatedly(Return(true));
+
EXPECT_CALL(thread_dispatcher_, post(_));
tls_.registerThread(thread_dispatcher_, false);
+ // Register the main thread after the worker thread to ensure that the
+ // thread_local_data_.dispatcher_ of current test thread is set to the main thread dispatcher.
+ tls_.registerThread(main_dispatcher_, true);
+ EXPECT_EQ(&main_dispatcher_, &tls_.dispatcher());
}

MOCK_METHOD(ThreadLocalObjectSharedPtr, createThreadLocal, (Event::Dispatcher & dispatcher));
@@ -75,8 +82,8 @@ public:
int freeSlotIndexesListSize() { return tls_.free_slot_indexes_.size(); }
InstanceImpl tls_;

- Event::MockDispatcher main_dispatcher_{"test_main_thread"};
- Event::MockDispatcher thread_dispatcher_{"test_worker_thread"};
+ NiceMock<Event::MockDispatcher> main_dispatcher_{"test_main_thread"};
+ NiceMock<Event::MockDispatcher> thread_dispatcher_{"test_worker_thread"};
};

TEST_F(ThreadLocalInstanceImplTest, All) {
@@ -343,5 +350,90 @@ TEST(ThreadLocalInstanceImplDispatcherTest, Dispatcher) {
tls.shutdownThread();
}

+TEST(ThreadLocalInstanceImplDispatcherTest, DestroySlotOnWorker) {
+ InstanceImpl tls;
+
+ Api::ApiPtr api = Api::createApiForTest();
+ Event::MockDispatcher main_dispatcher{"test_main_thread"};
+ Event::DispatcherPtr thread_dispatcher(api->allocateDispatcher("test_worker_thread"));
+
+ tls.registerThread(main_dispatcher, true);
+ tls.registerThread(*thread_dispatcher, false);
+
+ // Verify we have the expected dispatcher for the main thread.
+ EXPECT_EQ(&main_dispatcher, &tls.dispatcher());
+
+ auto slot = TypedSlot<>::makeUnique(tls);
+
+ Thread::ThreadPtr thread = Thread::threadFactoryForTest().createThread(
+ [&main_dispatcher, &thread_dispatcher, &tls, &slot]() {
+ // Ensure that the dispatcher update in tls posted during the above registerThread happens.
+ thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock);
+ // Verify we have the expected dispatcher for the new thread thread.
+ EXPECT_EQ(thread_dispatcher.get(), &tls.dispatcher());
+
+ // Skip the asserts in the thread. Because the mock dispatcher will call
+ // callbacks directly in current thread and make the ASSERT_IS_MAIN_OR_TEST_THREAD fail.
+ Thread::SkipAsserts skip;
+
+ EXPECT_CALL(main_dispatcher, isThreadSafe()).WillOnce(Return(false));
+ // Destroy the slot on worker thread and expect the post() of main dispatcher to be called.
+ EXPECT_CALL(main_dispatcher, post(_));
+
+ slot.reset();
+
+ thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock);
+ });
+ thread->join();
+
+ // Verify we still have the expected dispatcher for the main thread.
+ EXPECT_EQ(&main_dispatcher, &tls.dispatcher());
+
+ tls.shutdownGlobalThreading();
+ tls.shutdownThread();
+}
+
+TEST(ThreadLocalInstanceImplDispatcherTest, DestroySlotOnWorkerButDisableRuntimeFeature) {
+ TestScopedRuntime runtime;
+ runtime.mergeValues({{"envoy.restart_features.allow_slot_destroy_on_worker_threads", "false"}});
+
+ InstanceImpl tls;
+
+ Api::ApiPtr api = Api::createApiForTest();
+ Event::MockDispatcher main_dispatcher{"test_main_thread"};
+ Event::DispatcherPtr thread_dispatcher(api->allocateDispatcher("test_worker_thread"));
+
+ tls.registerThread(main_dispatcher, true);
+ tls.registerThread(*thread_dispatcher, false);
+
+ // Verify we have the expected dispatcher for the main thread.
+ EXPECT_EQ(&main_dispatcher, &tls.dispatcher());
+
+ auto slot = TypedSlot<>::makeUnique(tls);
+
+ Thread::ThreadPtr thread = Thread::threadFactoryForTest().createThread(
+ [&main_dispatcher, &thread_dispatcher, &tls, &slot]() {
+ // Ensure that the dispatcher update in tls posted during the above registerThread happens.
+ thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock);
+ // Verify we have the expected dispatcher for the new thread thread.
+ EXPECT_EQ(thread_dispatcher.get(), &tls.dispatcher());
+
+ // Skip the asserts in the thread.
+ Thread::SkipAsserts skip;
+ // Destroy the slot on worker thread will not call post() of main dispatcher.
+ EXPECT_CALL(main_dispatcher, post(_)).Times(0);
+ slot.reset();
+
+ thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock);
+ });
+ thread->join();
+
+ // Verify we still have the expected dispatcher for the main thread.
+ EXPECT_EQ(&main_dispatcher, &tls.dispatcher());
+
+ tls.shutdownGlobalThreading();
+ tls.shutdownThread();
+}
+
} // namespace ThreadLocal
} // namespace Envoy
1 change: 1 addition & 0 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def envoy_gloo_dependencies():
_repository_impl("envoy", patches=[
"@envoy_gloo//bazel/foreign_cc:0001-otel-exporter-status-code-fix.patch",
"@envoy_gloo//bazel/foreign_cc:0002-ratelimit-filter-state-hits-addend.patch",
"@envoy_gloo//bazel/foreign_cc:0003-deallocate-slots-on-worker-threads.patch", # https://github.com/envoyproxy/envoy/pull/33395
])
_repository_impl("json", build_file = "@envoy_gloo//bazel/external:json.BUILD")
_repository_impl("inja", build_file = "@envoy_gloo//bazel/external:inja.BUILD")
16 changes: 16 additions & 0 deletions changelog/v1.30.4-patch4/deallocate-tls-on-worker-thread.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
changelog:
- type: FIX
issueLink: https://github.com/solo-io/solo-projects/issues/6713
resolvesIssue: false
description: >
Backport an upstream Envoy fix that allows deallocating thread-local slots on worker threads.
- type: FIX
issueLink: https://github.com/solo-io/envoy-gloo/pull/292
resolvesIssue: false
description: >
Fix an issue in the ci/check_extensions_build_config.sh script. One of the
grep commands was simply grepping for 'commit', which meant that any other
line in the file that contained the same word would also show up in the
grep, thereby messing up any string processing that followed. We fix this
by making the grep a bit stricter, ie. ensuring that the line _starts_ with
'commit' (barring any preceding whitespace for indentation purposes).
2 changes: 1 addition & 1 deletion ci/check_extensions_build_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ get_UPSTREAM_REPO() {
get_envoy_commit_hash() {
local file_path="$1"
local commit_hash
commit_hash=$(grep -A 2 "envoy =" "$file_path" | grep commit | cut -d '"' -f 2)
commit_hash=$(grep -A 2 "envoy =" "$file_path" | grep '^ *commit' | cut -d '"' -f 2)
echo "$commit_hash"
}

Expand Down

0 comments on commit 466acb5

Please sign in to comment.