Skip to content

Commit

Permalink
Add Code cache TTL and tests. (envoyproxy#202)
Browse files Browse the repository at this point in the history
* Add Wasm code cache and option to fail if not cached.

Signed-off-by: John Plevyak <jplevyak@gmail.com>

* Add cache TTL to code cache with tests.

Signed-off-by: John Plevyak <jplevyak@gmail.com>
  • Loading branch information
jplevyak authored Apr 25, 2020
1 parent 11176a0 commit ad7f85b
Show file tree
Hide file tree
Showing 8 changed files with 420 additions and 362 deletions.
4 changes: 3 additions & 1 deletion source/common/config/datasource.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "envoy/api/api.h"
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/event/deferred_deletable.h"
#include "envoy/init/manager.h"
#include "envoy/upstream/cluster_manager.h"

Expand Down Expand Up @@ -58,7 +59,8 @@ class LocalAsyncDataProvider {

using LocalAsyncDataProviderPtr = std::unique_ptr<LocalAsyncDataProvider>;

class RemoteAsyncDataProvider : public Config::DataFetcher::RemoteDataFetcherCallback,
class RemoteAsyncDataProvider : public Event::DeferredDeletable,
public Config::DataFetcher::RemoteDataFetcherCallback,
public Logger::Loggable<Logger::Id::config> {
public:
RemoteAsyncDataProvider(Upstream::ClusterManager& cm, Init::Manager& manager,
Expand Down
194 changes: 144 additions & 50 deletions source/extensions/common/wasm/wasm.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "extensions/common/wasm/wasm.h"

#include <stdio.h>
#include <algorithm>
#include <chrono>

#include <algorithm>
#include <cctype>
Expand Down Expand Up @@ -66,7 +68,16 @@ std::string Sha256(absl::string_view data) {

namespace {

struct CodeCacheEntry {
std::string code;
bool in_progress;
MonotonicTime use_time;
MonotonicTime fetch_time;
};

std::atomic<int64_t> active_wasm_;
std::mutex code_cache_mutex;
std::unordered_map<std::string, CodeCacheEntry>* code_cache = nullptr;

std::string Xor(absl::string_view a, absl::string_view b) {
ASSERT(a.size() == b.size());
Expand All @@ -85,6 +96,13 @@ ABSL_CONST_INIT absl::Mutex base_wasms_mutex_(absl::kConstInit);
absl::flat_hash_map<std::string, std::weak_ptr<WasmHandle>>* base_wasms_ = nullptr;

const std::string INLINE_STRING = "<inline>";
// NB: xDS currently does not support failing asynchronously, so we fail immediately
// if remote Wasm code is not cached and do a background fill.
const bool DEFAULT_FAIL_IF_NOT_CACHED = true;
bool fail_if_code_not_cached = DEFAULT_FAIL_IF_NOT_CACHED;
const int CODE_CACHE_SECONDS_NEGATIVE_CACHING = 10;
const int CODE_CACHE_SECONDS_CACHING_TTL = 24 * 3600; // 24 hours.
MonotonicTime::duration cache_time_offset_for_testing{};

const uint8_t* decodeVarint(const uint8_t* pos, const uint8_t* end, uint32_t* out) {
uint32_t ret = 0;
Expand Down Expand Up @@ -530,6 +548,20 @@ WasmForeignFunction Wasm::getForeignFunction(absl::string_view function_name) {
return WasmForeignFunction();
}

void clearCodeCacheForTesting(bool fail_if_not_cached) {
std::lock_guard<std::mutex> guard(code_cache_mutex);
fail_if_code_not_cached = fail_if_not_cached;
if (code_cache) {
delete code_cache;
code_cache = nullptr;
}
}

// TODO: remove this post #4160: Switch default to SimulatedTimeSystem.
void setTimeOffsetForCodeCacheForTesting(MonotonicTime::duration d) {
cache_time_offset_for_testing = d;
}

static void
createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager,
Expand All @@ -538,71 +570,133 @@ createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin, Stats::Sco
Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
std::unique_ptr<Context> root_context_for_testing, CreateWasmCallback&& cb) {
std::string source, code;
bool fetch = false;
if (vm_config.code().has_remote()) {
auto now = dispatcher.timeSource().monotonicTime() + cache_time_offset_for_testing;
source = vm_config.code().remote().http_uri().uri();
std::lock_guard<std::mutex> guard(code_cache_mutex);
if (!code_cache) {
code_cache = new std::remove_reference<decltype(*code_cache)>::type;
}
// Remove entries older than CODE_CACHE_SECONDS_CACHING_TTL except for our target.
for (auto it = code_cache->begin(); it != code_cache->end();) {
if (now - it->second.use_time > std::chrono::seconds(CODE_CACHE_SECONDS_CACHING_TTL) &&
it->first != vm_config.code().remote().sha256()) {
it = code_cache->erase(it);
} else {
++it;
}
}
auto it = code_cache->find(vm_config.code().remote().sha256());
if (it != code_cache->end()) {
it->second.use_time = now;
if (it->second.in_progress) {
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
"createWasm: failed to load (in prpgress) from {}", source);
throw WasmException(
fmt::format("Failed to load WASM code (fetch in progress) from {}", source));
}
code = it->second.code;
if (code.empty()) {
if (now - it->second.fetch_time <
std::chrono::seconds(CODE_CACHE_SECONDS_NEGATIVE_CACHING)) {
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
"createWasm: failed to load (cached) from {}", source);
throw WasmException(fmt::format("Failed to load WASM code (cached) from {}", source));
}
fetch = true; // Fetch failed, retry.
it->second.in_progress = true;
it->second.fetch_time = now;
}
} else {
fetch = true; // Not in cache, fetch.
auto& e = (*code_cache)[vm_config.code().remote().sha256()];
e.in_progress = true;
e.use_time = e.fetch_time = now;
}
} else if (vm_config.code().has_local()) {
code = Config::DataSource::read(vm_config.code().local(), true, api);
source = Config::DataSource::getPath(vm_config.code().local())
.value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
}

auto callback = [vm_config, scope, &cluster_manager, &dispatcher, &lifecycle_notifier, plugin, cb,
source,
context_ptr = root_context_for_testing ? root_context_for_testing.release()
: nullptr](const std::string& code) {
std::unique_ptr<Context> context(context_ptr);
if (code.empty()) {
throw WasmException(fmt::format("Failed to load WASM code from {}", source));
}
// Construct a unique identifier for the VM based on a hash of the code, vm configuration data
// and vm_id.
std::string vm_key = Sha256(vm_config.vm_id());
vm_key = Xor(vm_key, Sha256(vm_config.configuration()));
vm_key = Xor(vm_key, Sha256(code));
vm_key = Base64::encode(&*vm_key.begin(), vm_key.size());

std::shared_ptr<WasmHandle> wasm;
{
absl::MutexLock l(&base_wasms_mutex_);
if (!base_wasms_) {
base_wasms_ = new std::remove_reference<decltype(*base_wasms_)>::type;
}
auto it = base_wasms_->find(vm_key);
if (it != base_wasms_->end()) {
wasm = it->second.lock();
if (!wasm) {
base_wasms_->erase(it);
auto complete_cb =
[cb, vm_config, plugin, scope, &cluster_manager, &dispatcher, &lifecycle_notifier, source,
root_context_for_testing_ptr = root_context_for_testing.release()](std::string code) {
auto root_context = std::unique_ptr<Context>(root_context_for_testing_ptr);

std::string vm_key = Sha256(vm_config.vm_id());
vm_key = Xor(vm_key, Sha256(vm_config.configuration()));
vm_key = Xor(vm_key, Sha256(code));
vm_key = Base64::encode(&*vm_key.begin(), vm_key.size());

std::shared_ptr<WasmHandle> wasm;
{
absl::MutexLock l(&base_wasms_mutex_);
if (!base_wasms_) {
base_wasms_ = new std::remove_reference<decltype(*base_wasms_)>::type;
}
auto it = base_wasms_->find(vm_key);
if (it != base_wasms_->end()) {
wasm = it->second.lock();
if (!wasm) {
base_wasms_->erase(it);
}
}
if (!wasm) {
wasm = std::make_shared<WasmHandle>(std::make_shared<Wasm>(
vm_config.runtime(), vm_config.vm_id(), vm_config.configuration(), vm_key, scope,
cluster_manager, dispatcher));
// NB: we need the shared_ptr to have been created for shared_from_this() to work.
wasm->wasm()->initializeLifecycle(lifecycle_notifier);
if (!wasm->wasm()->initialize(code, vm_config.allow_precompiled())) {
throw WasmException(fmt::format("Failed to initialize WASM code from {}", source));
}
if (!root_context) {
wasm->wasm()->start(plugin);
} else {
wasm->wasm()->startForTesting(std::move(root_context), plugin);
}
(*base_wasms_)[vm_key] = wasm;
}
cb(std::move(wasm));
}
};

if (fetch) {
// NB: if the (fetching) exception is thrown below, the remote_data provider will be deleted
// immediately rather than completing the async fetch, so allow for self-delete.
auto remote_data_provider_holder =
std::make_shared<std::unique_ptr<Config::DataSource::RemoteAsyncDataProvider>>();
auto fetch_callback = [vm_config, complete_cb, source, &dispatcher,
remote_data_provider_holder](const std::string& code) {
{
std::lock_guard<std::mutex> guard(code_cache_mutex);
auto& e = (*code_cache)[vm_config.code().remote().sha256()];
e.in_progress = false;
e.code = code;
}
if (!wasm) {
wasm = std::make_shared<WasmHandle>(std::make_shared<Wasm>(
vm_config.runtime(), vm_config.vm_id(), vm_config.configuration(), vm_key, scope,
cluster_manager, dispatcher));
// NB: we need the shared_ptr to have been created for shared_from_this() to work.
wasm->wasm()->initializeLifecycle(lifecycle_notifier);
if (!wasm->wasm()->initialize(code, vm_config.allow_precompiled())) {
throw WasmException(fmt::format("Failed to initialize WASM code from {}", source));
}
if (!context) {
wasm->wasm()->start(plugin);
} else {
wasm->wasm()->startForTesting(std::move(context), plugin);
if (!fail_if_code_not_cached) {
if (code.empty()) {
throw WasmException(
fmt::format("Failed to load WASM code (fetch failed) from {}", source));
}
(*base_wasms_)[vm_key] = wasm;
complete_cb(code);
}
}

cb(std::move(wasm));
};

if (vm_config.code().has_remote()) {
// NB: must be deleted explicitly.
dispatcher.deferredDelete(
Envoy::Event::DeferredDeletablePtr{remote_data_provider_holder->release()});
remote_data_provider_holder->reset();
};
remote_data_provider = std::make_unique<Config::DataSource::RemoteAsyncDataProvider>(
cluster_manager, init_manager, vm_config.code().remote(), dispatcher, random, true,
std::move(callback));
} else if (vm_config.code().has_local()) {
callback(code);
fetch_callback);
if (fail_if_code_not_cached) {
*remote_data_provider_holder = std::move(remote_data_provider);
throw WasmException(fmt::format("Failed to load WASM code (fetching) from {}", source));
}
} else {
callback(EMPTY_STRING);
complete_cb(code);
}
}

Expand Down
6 changes: 5 additions & 1 deletion source/extensions/common/wasm/wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <atomic>
#include <deque>
#include <chrono>
#include <map>
#include <memory>

Expand All @@ -10,8 +11,8 @@
#include "envoy/common/exception.h"
#include "envoy/config/wasm/v3/wasm.pb.validate.h"
#include "envoy/http/filter.h"
#include "envoy/server/wasm.h"
#include "envoy/server/lifecycle_notifier.h"
#include "envoy/server/wasm.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats.h"
#include "envoy/thread_local/thread_local.h"
Expand Down Expand Up @@ -447,6 +448,9 @@ template <typename T> inline bool Wasm::setDatatype(uint64_t ptr, const T& t) {
return wasm_vm_->setMemory(ptr, sizeof(T), &t);
}

void clearCodeCacheForTesting(bool fail_if_not_cached);
void setTimeOffsetForCodeCacheForTesting(MonotonicTime::duration d);

} // namespace Wasm
} // namespace Common
} // namespace Extensions
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/access_loggers/wasm/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ TEST_P(WasmAccessLogConfigTest, CreateWasmFromEmpty) {
AccessLog::InstanceSharedPtr instance;
EXPECT_THROW_WITH_MESSAGE(
instance = factory->createAccessLogInstance(*message, std::move(filter), context),
Common::Wasm::WasmException, "Failed to load WASM code from ");
Common::Wasm::WasmVmException, "Failed to create WASM VM with unspecified runtime.");
}

TEST_P(WasmAccessLogConfigTest, CreateWasmFromWASM) {
Expand Down
6 changes: 5 additions & 1 deletion test/extensions/common/wasm/wasm_test.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <stdio.h>

#include "envoy/server/lifecycle_notifier.h"

#include "common/common/hex.h"
Expand Down Expand Up @@ -37,7 +38,10 @@ class TestContext : public Extensions::Common::Wasm::Context {
MOCK_METHOD2(scriptLog_, void(spdlog::level::level_enum level, absl::string_view message));
};

class WasmCommonTest : public testing::TestWithParam<std::string> {};
class WasmCommonTest : public testing::TestWithParam<std::string> {
public:
void SetUp() { clearCodeCacheForTesting(false); }
};

INSTANTIATE_TEST_SUITE_P(Runtimes, WasmCommonTest,
testing::Values("v8",
Expand Down
Loading

0 comments on commit ad7f85b

Please sign in to comment.