Skip to content

Commit

Permalink
Cache and share the base Wasm VM. (#157)
Browse files Browse the repository at this point in the history
* Cache and share the base Wasm (envoyproxy#387)

Cache and share the base Wasm.  Use the new definition of Wasm Key to
find the base Wasm and thread local Wasm.

Signed-off-by: John Plevyak <jplevyak@gmail.com>

* Only call proxy_on_vm_start() when the VM is actaully starting. (envoyproxy#400)

Signed-off-by: John Plevyak <jplevyak@gmail.com>

* Fix SEGV when reusing the base vm. (envoyproxy#413)

Signed-off-by: John Plevyak <jplevyak@gmail.com>

Co-authored-by: John Plevyak <jplevyak@gmail.com>
  • Loading branch information
PiotrSikora and jplevyak authored Feb 22, 2020
1 parent 1485c93 commit a7e5621
Show file tree
Hide file tree
Showing 13 changed files with 229 additions and 102 deletions.
8 changes: 4 additions & 4 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ REPOSITORY_LOCATIONS = dict(
urls = ["https://commondatastorage.googleapis.com/chromium-boringssl-docs/fips/boringssl-66005f41fbc3529ffe8d007708756720529da20d.tar.xz"],
),
com_google_absl = dict(
sha256 = "190b0c9e65ef0866b44c54b517b5a3e15b67a1001b34547f03f8f4d8553c2851",
strip_prefix = "abseil-cpp-63ee2f8877915a3565c29707dba8fe4d7822596a",
# 2020-01-08
urls = ["https://github.com/abseil/abseil-cpp/archive/63ee2f8877915a3565c29707dba8fe4d7822596a.tar.gz"],
sha256 = "19391fb4882601a65cb648d638c11aa301ce5f525ef02da1a9eafd22f72d7c59",
strip_prefix = "abseil-cpp-37dd2562ec830d547a1524bb306be313ac3f2556",
# 2020-01-29
urls = ["https://github.com/abseil/abseil-cpp/archive/37dd2562ec830d547a1524bb306be313ac3f2556.tar.gz"],
),
com_github_apache_thrift = dict(
sha256 = "7d59ac4fdcb2c58037ebd4a9da5f9a49e3e034bf75b3f26d9fe48ba3d8806e6b",
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/access_loggers/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ WasmAccessLogFactory::createAccessLogInstance(const Protobuf::Message& proto_con
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot->set([base_wasm, plugin, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, plugin, *configuration, dispatcher));
Common::Wasm::getOrCreateThreadLocalWasm(base_wasm, plugin, *configuration, dispatcher));
});
access_log->setTlsSlot(std::move(tls_slot));
};
Expand Down
149 changes: 99 additions & 50 deletions source/extensions/common/wasm/wasm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace {

std::atomic<int64_t> active_wasm_;

std::string base64Sha256(absl::string_view data) {
std::string Sha256(absl::string_view data) {
std::vector<uint8_t> digest(SHA256_DIGEST_LENGTH);
EVP_MD_CTX* ctx(EVP_MD_CTX_new());
auto rc = EVP_DigestInit(ctx, EVP_sha256());
Expand All @@ -65,11 +65,24 @@ std::string base64Sha256(absl::string_view data) {
rc = EVP_DigestFinal(ctx, digest.data(), nullptr);
RELEASE_ASSERT(rc == 1, "Failed to finalize digest");
EVP_MD_CTX_free(ctx);
return Base64::encode(reinterpret_cast<const char*>(&digest[0]), digest.size());
return std::string(reinterpret_cast<const char*>(&digest[0]), digest.size());
}

// Map from Wasm ID to the local Wasm instance.
thread_local absl::flat_hash_map<std::string, std::weak_ptr<WasmHandle>> local_wasms;
std::string Xor(absl::string_view a, absl::string_view b) {
ASSERT(a.size() == b.size());
std::string result;
result.reserve(a.size());
for (size_t i = 0; i < a.size(); i++) {
result.push_back(a[i] ^ b[i]);
}
return result;
}

// Map from Wasm Key to the local Wasm instance.
thread_local absl::flat_hash_map<std::string, std::weak_ptr<WasmHandle>> local_wasms_;
// Map from Wasm Key to the base Wasm instance, using a pointer to avoid the initialization fiasco.
ABSL_CONST_INIT absl::Mutex base_wasms_mutex_(absl::kConstInit);
absl::flat_hash_map<std::string, std::weak_ptr<WasmHandle>>* base_wasms_ = nullptr;

const std::string INLINE_STRING = "<inline>";

Expand All @@ -92,10 +105,11 @@ const uint8_t* decodeVarint(const uint8_t* pos, const uint8_t* end, uint32_t* ou
} // namespace

Wasm::Wasm(absl::string_view runtime, absl::string_view vm_id, absl::string_view vm_configuration,
Stats::ScopeSharedPtr scope, Upstream::ClusterManager& cluster_manager,
Event::Dispatcher& dispatcher)
: vm_id_(std::string(vm_id)), wasm_vm_(Common::Wasm::createWasmVm(runtime, scope)),
scope_(scope), cluster_manager_(cluster_manager), dispatcher_(dispatcher),
absl::string_view vm_key, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher)
: vm_id_(std::string(vm_id)), vm_key_(std::string(vm_key)),
wasm_vm_(Common::Wasm::createWasmVm(runtime, scope)), scope_(scope),
cluster_manager_(cluster_manager), dispatcher_(dispatcher),
time_source_(dispatcher.timeSource()), vm_configuration_(vm_configuration),
wasm_stats_(WasmStats{
ALL_WASM_STATS(POOL_COUNTER_PREFIX(*scope_, absl::StrCat("wasm.", runtime, ".")),
Expand Down Expand Up @@ -243,18 +257,22 @@ void Wasm::getFunctions() {
}
}

Wasm::Wasm(const Wasm& wasm, Event::Dispatcher& dispatcher)
: std::enable_shared_from_this<Wasm>(wasm), vm_id_(wasm.vm_id_),
vm_id_with_hash_(wasm.vm_id_with_hash_), started_from_(wasm.wasm_vm()->cloneable()),
scope_(wasm.scope_), cluster_manager_(wasm.cluster_manager_), dispatcher_(dispatcher),
time_source_(dispatcher.timeSource()), wasm_stats_(wasm.wasm_stats_),
stat_name_set_(wasm.stat_name_set_) {
Wasm::Wasm(WasmHandleSharedPtr& base_wasm_handle, Event::Dispatcher& dispatcher)
: std::enable_shared_from_this<Wasm>(*base_wasm_handle->wasm()),
vm_id_(base_wasm_handle->wasm()->vm_id_), vm_key_(base_wasm_handle->wasm()->vm_key_),
started_from_(base_wasm_handle->wasm()->wasm_vm()->cloneable()),
scope_(base_wasm_handle->wasm()->scope_),
cluster_manager_(base_wasm_handle->wasm()->cluster_manager_), dispatcher_(dispatcher),
time_source_(dispatcher.timeSource()), base_wasm_handle_(base_wasm_handle),
wasm_stats_(base_wasm_handle->wasm()->wasm_stats_),
stat_name_set_(base_wasm_handle->wasm()->stat_name_set_) {
if (started_from_ != Cloneable::NotCloneable) {
wasm_vm_ = wasm.wasm_vm()->clone();
wasm_vm_ = base_wasm_handle->wasm()->wasm_vm()->clone();
} else {
wasm_vm_ = Common::Wasm::createWasmVm(wasm.wasm_vm()->runtime(), scope_);
wasm_vm_ = Common::Wasm::createWasmVm(base_wasm_handle->wasm()->wasm_vm()->runtime(), scope_);
}
if (!initialize(wasm.code(), wasm.allow_precompiled())) {
if (!initialize(base_wasm_handle->wasm()->code(),
base_wasm_handle->wasm()->allow_precompiled())) {
throw WasmException("Failed to load WASM code");
}
active_wasm_++;
Expand All @@ -280,10 +298,6 @@ bool Wasm::initialize(const std::string& code, bool allow_precompiled) {
}

if (started_from_ == Cloneable::NotCloneable) {
// Construct a unique identifier for the VM based on the provided vm_id and a hash of the
// code.
vm_id_with_hash_ = vm_id_ + ":" + base64Sha256(code);

auto ok = wasm_vm_->load(code, allow_precompiled);
if (!ok) {
return false;
Expand Down Expand Up @@ -338,6 +352,16 @@ bool Wasm::initialize(const std::string& code, bool allow_precompiled) {
return true;
}

Context* Wasm::getOrCreateRootContext(const PluginSharedPtr& plugin) {
auto root_context = getRootContext(plugin->root_id_);
if (!root_context) {
auto context = std::make_unique<Context>(this, plugin);
root_context = context.get();
root_contexts_[plugin->root_id_] = std::move(context);
}
return root_context;
}

void Wasm::startVm(Context* root_context) {
/* Call "_start" function, and fallback to "__wasm_call_ctors" if the former is not available. */
if (_start_) {
Expand Down Expand Up @@ -482,10 +506,6 @@ static void createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin
Api::Api& api, std::unique_ptr<Context> root_context_for_testing,
Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
CreateWasmCallback&& cb) {
auto wasm = std::make_shared<WasmHandle>(
std::make_shared<Wasm>(vm_config.runtime(), vm_config.vm_id(), vm_config.configuration(),
scope, cluster_manager, dispatcher));

std::string source, code;
if (vm_config.code().has_remote()) {
source = vm_config.code().remote().http_uri().uri();
Expand All @@ -495,22 +515,50 @@ static void createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin
.value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
}

auto callback = [wasm, plugin, cb, source, allow_precompiled = vm_config.allow_precompiled(),
auto callback = [vm_config, scope, &cluster_manager, &dispatcher, plugin, cb, source,
context_ptr = root_context_for_testing ? root_context_for_testing.release()
: nullptr](const std::string& code) {
std::unique_ptr<Context> context(context_ptr);
if (code.empty()) {
throw WasmException(fmt::format("Failed to load WASM code from {}", source));
}
if (!wasm->wasm()->initialize(code, allow_precompiled)) {
throw WasmException(fmt::format("Failed to initialize WASM code from {}", source));
}
if (!context) {
wasm->wasm()->start(plugin);
} else {
wasm->wasm()->startForTesting(std::move(context), plugin);
// Construct a unique identifier for the VM based on a hash of the code, vm configuration data
// and vm_id.
std::string vm_key = Sha256(vm_config.vm_id());
vm_key = Xor(vm_key, Sha256(vm_config.configuration()));
vm_key = Xor(vm_key, Sha256(code));
vm_key = Base64::encode(&*vm_key.begin(), vm_key.size());

std::shared_ptr<WasmHandle> wasm;
{
absl::MutexLock l(&base_wasms_mutex_);
if (!base_wasms_) {
base_wasms_ = new std::remove_reference<decltype(*base_wasms_)>::type;
}
auto it = base_wasms_->find(vm_key);
if (it != base_wasms_->end()) {
wasm = it->second.lock();
if (!wasm) {
base_wasms_->erase(it);
}
}
if (!wasm) {
wasm = std::make_shared<WasmHandle>(std::make_shared<Wasm>(
vm_config.runtime(), vm_config.vm_id(), vm_config.configuration(), vm_key, scope,
cluster_manager, dispatcher));
if (!wasm->wasm()->initialize(code, vm_config.allow_precompiled())) {
throw WasmException(fmt::format("Failed to initialize WASM code from {}", source));
}
if (!context) {
wasm->wasm()->start(plugin);
} else {
wasm->wasm()->startForTesting(std::move(context), plugin);
}
(*base_wasms_)[vm_key] = wasm;
}
}
cb(wasm);

cb(std::move(wasm));
};

if (vm_config.code().has_remote()) {
Expand Down Expand Up @@ -543,40 +591,41 @@ void createWasmForTesting(const envoy::extensions::wasm::v3::VmConfig& vm_config
std::move(root_context_for_testing), remote_data_provider, std::move(cb));
}

WasmHandleSharedPtr createThreadLocalWasm(WasmHandle& base_wasm, PluginSharedPtr plugin,
WasmHandleSharedPtr createThreadLocalWasm(WasmHandleSharedPtr& base_wasm, PluginSharedPtr plugin,
absl::string_view configuration,
Event::Dispatcher& dispatcher) {
auto wasm = std::make_shared<WasmHandle>(std::make_shared<Wasm>(*base_wasm.wasm(), dispatcher));
Context* root_context = wasm->wasm()->start(plugin);
if (!wasm->wasm()->configure(root_context, plugin, configuration)) {
auto wasm_handle = std::make_shared<WasmHandle>(std::make_shared<Wasm>(base_wasm, dispatcher));
Context* root_context = wasm_handle->wasm()->start(plugin);
if (!wasm_handle->wasm()->configure(root_context, plugin, configuration)) {
throw WasmException("Failed to configure WASM code");
}
local_wasms[wasm->wasm()->vm_id_with_hash()] = wasm;
return wasm;
local_wasms_[wasm_handle->wasm()->vm_key()] = wasm_handle;
return wasm_handle;
}

WasmHandleSharedPtr getThreadLocalWasmPtr(absl::string_view vm_id) {
auto it = local_wasms.find(vm_id);
if (it == local_wasms.end()) {
WasmHandleSharedPtr getThreadLocalWasmPtr(absl::string_view vm_key) {
auto it = local_wasms_.find(vm_key);
if (it == local_wasms_.end()) {
return nullptr;
}
auto wasm = it->second.lock();
if (!wasm) {
local_wasms.erase(vm_id);
local_wasms_.erase(vm_key);
}
return wasm;
}

WasmHandleSharedPtr getOrCreateThreadLocalWasm(WasmHandle& base_wasm, PluginSharedPtr plugin,
WasmHandleSharedPtr getOrCreateThreadLocalWasm(WasmHandleSharedPtr base_wasm,
PluginSharedPtr plugin,
absl::string_view configuration,
Event::Dispatcher& dispatcher) {
auto wasm = getThreadLocalWasmPtr(base_wasm.wasm()->vm_id_with_hash());
if (wasm) {
auto root_context = wasm->wasm()->start(plugin);
if (!wasm->wasm()->configure(root_context, plugin, configuration)) {
auto wasm_handle = getThreadLocalWasmPtr(base_wasm->wasm()->vm_key());
if (wasm_handle) {
auto root_context = wasm_handle->wasm()->getOrCreateRootContext(plugin);
if (!wasm_handle->wasm()->configure(root_context, plugin, configuration)) {
throw WasmException("Failed to configure WASM code");
}
return wasm;
return wasm_handle;
}
return createThreadLocalWasm(base_wasm, plugin, configuration, dispatcher);
}
Expand Down
32 changes: 17 additions & 15 deletions source/extensions/common/wasm/wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ using VmConfig = envoy::extensions::wasm::v3::VmConfig;
using WasmForeignFunction =
std::function<WasmResult(Wasm&, absl::string_view, std::function<void*(size_t size)>)>;

class WasmHandle;

// Wasm execution instance. Manages the Envoy side of the Wasm interface.
class Wasm : public Logger::Loggable<Logger::Id::wasm>, public std::enable_shared_from_this<Wasm> {
public:
Wasm(absl::string_view runtime, absl::string_view vm_id, absl::string_view vm_configuration,
Stats::ScopeSharedPtr scope, Upstream::ClusterManager& cluster_manager,
Event::Dispatcher& dispatcher);
Wasm(const Wasm& other, Event::Dispatcher& dispatcher);
absl::string_view vm_key, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher);
Wasm(std::shared_ptr<WasmHandle>& other, Event::Dispatcher& dispatcher);
~Wasm();

bool initialize(const std::string& code, bool allow_precompiled = false);
Expand All @@ -68,11 +70,12 @@ class Wasm : public Logger::Loggable<Logger::Id::wasm>, public std::enable_share
Context* start(PluginSharedPtr plugin); // returns the root Context.

absl::string_view vm_id() const { return vm_id_; }
absl::string_view vm_id_with_hash() const { return vm_id_with_hash_; }
absl::string_view vm_key() const { return vm_key_; }
WasmVm* wasm_vm() const { return wasm_vm_.get(); }
Context* vm_context() const { return vm_context_.get(); }
Stats::StatNameSetSharedPtr stat_name_set() const { return stat_name_set_; }
Context* getRootContext(absl::string_view root_id) { return root_contexts_[root_id].get(); }
Context* getOrCreateRootContext(const PluginSharedPtr& plugin);
Context* getContext(uint32_t id) {
auto it = contexts_.find(id);
if (it != contexts_.end())
Expand Down Expand Up @@ -169,8 +172,8 @@ class Wasm : public Logger::Loggable<Logger::Id::wasm>, public std::enable_share
void establishEnvironment(); // Language specific environments.
void getFunctions(); // Get functions call into WASM.

std::string vm_id_; // User-provided vm_id.
std::string vm_id_with_hash_; // vm_id + hash of code.
std::string vm_id_; // User-provided vm_id.
std::string vm_key_; // Hash(code, vm configuration data, vm_id_)
std::unique_ptr<WasmVm> wasm_vm_;
Cloneable started_from_{Cloneable::NotCloneable};
Stats::ScopeSharedPtr scope_;
Expand Down Expand Up @@ -234,6 +237,8 @@ class Wasm : public Logger::Loggable<Logger::Id::wasm>, public std::enable_share
WasmCallVoid<1> on_log_;
WasmCallVoid<1> on_delete_;

std::shared_ptr<WasmHandle> base_wasm_handle_;

// Used by the base_wasm to enable non-clonable thread local Wasm(s) to be constructed.
std::string code_;
std::string vm_configuration_;
Expand Down Expand Up @@ -269,14 +274,9 @@ class WasmHandle : public Envoy::Server::Wasm,
public std::enable_shared_from_this<WasmHandle> {
public:
explicit WasmHandle(WasmSharedPtr wasm) : wasm_(wasm) {}
~WasmHandle() {
auto wasm = wasm_;
// NB: V8 will stack overflow during the stress test if we shutdown with the call stack in the
// ThreadLocal set call so shift to a fresh call stack.
wasm_->dispatcher().post([wasm] { wasm->shutdown(); });
}
~WasmHandle() { wasm_->shutdown(); }

const WasmSharedPtr& wasm() { return wasm_; }
WasmSharedPtr& wasm() { return wasm_; }

private:
WasmSharedPtr wasm_;
Expand Down Expand Up @@ -308,8 +308,10 @@ void createWasmForTesting(const VmConfig& vm_config, PluginSharedPtr plugin,
// Get an existing ThreadLocal VM matching 'vm_id' or nullptr if there isn't one.
WasmHandleSharedPtr getThreadLocalWasmPtr(absl::string_view vm_id);
// Get an existing ThreadLocal VM matching 'vm_id' or create one using 'base_wavm' by cloning or by
// using it it as a template.
WasmHandleSharedPtr getOrCreateThreadLocalWasm(WasmHandle& base_wasm, PluginSharedPtr plugin,
// using it it as a template. Note that 'base_wasm' typically is a const lambda capture and needs
// to be copied to be passed, hence the pass-by-value interface.
WasmHandleSharedPtr getOrCreateThreadLocalWasm(WasmHandleSharedPtr base_wasm,
PluginSharedPtr plugin,
absl::string_view configuration,
Event::Dispatcher& dispatcher);

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/wasm/wasm_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ FilterConfig::FilterConfig(const envoy::extensions::filters::http::wasm::v3::Was
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot_->set([base_wasm, plugin, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, plugin, *configuration, dispatcher));
Common::Wasm::getOrCreateThreadLocalWasm(base_wasm, plugin, *configuration, dispatcher));
});
};

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/network/wasm/wasm_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ FilterConfig::FilterConfig(const envoy::extensions::filters::network::wasm::v3::
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot_->set([base_wasm, plugin, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, plugin, *configuration, dispatcher));
Common::Wasm::getOrCreateThreadLocalWasm(base_wasm, plugin, *configuration, dispatcher));
});
};

Expand Down
5 changes: 2 additions & 3 deletions source/extensions/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ void WasmFactory::createWasm(const envoy::extensions::wasm::v3::WasmService& con
config.config().name(), config.config().root_id(), config.config().vm_config().vm_id(),
envoy::config::core::v3::TrafficDirection::UNSPECIFIED, context.localInfo(), nullptr);

auto callback = [&context, &config, plugin,
cb](std::shared_ptr<Common::Wasm::WasmHandle> base_wasm) {
auto callback = [&context, &config, plugin, cb](Common::Wasm::WasmHandleSharedPtr base_wasm) {
if (config.singleton()) {
// Return the WASM VM which will be stored as a singleton by the Server.
auto root_context = base_wasm->wasm()->start(plugin);
Expand All @@ -40,7 +39,7 @@ void WasmFactory::createWasm(const envoy::extensions::wasm::v3::WasmService& con
context.threadLocal().allocateSlot()->set([base_wasm, plugin,
configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, plugin, *configuration, dispatcher));
Common::Wasm::getOrCreateThreadLocalWasm(base_wasm, plugin, *configuration, dispatcher));
});
// Do not return this WASM VM since this is per-thread. Returning it would indicate that
// this is a singleton.
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/access_loggers/wasm/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ TEST_P(WasmAccessLogConfigTest, CreateWasmFromEmpty) {
AccessLog::InstanceSharedPtr instance;
EXPECT_THROW_WITH_MESSAGE(
instance = factory->createAccessLogInstance(*message, std::move(filter), context),
Common::Wasm::WasmVmException, "Failed to create WASM VM with unspecified runtime.");
Common::Wasm::WasmException, "Failed to load WASM code from ");
}

TEST_P(WasmAccessLogConfigTest, CreateWasmFromWASM) {
Expand Down
Loading

0 comments on commit a7e5621

Please sign in to comment.