Skip to content

Commit

Permalink
Fetch .wasm from remote URI without depending on Listener. (envoyprox…
Browse files Browse the repository at this point in the history
…y#204)

* Fetch .wasm from remote URI without depending on Listener.

Signed-off-by: John Plevyak <jplevyak@gmail.com>

* Reactivate tests.

Signed-off-by: John Plevyak <jplevyak@gmail.com>
  • Loading branch information
jplevyak authored May 5, 2020
1 parent ad7f85b commit 0e2c9c0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
1 change: 1 addition & 0 deletions source/extensions/common/wasm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ envoy_cc_library(
"//include/envoy/server:lifecycle_notifier_interface",
"//source/common/buffer:buffer_lib",
"//source/common/common:enum_to_int",
"//source/common/config:remote_data_fetcher_lib",
"//source/common/http:message_lib",
"//source/common/http:utility_lib",
"//source/common/tracing:http_tracer_lib",
Expand Down
50 changes: 37 additions & 13 deletions source/extensions/common/wasm/wasm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
#include <memory>
#include <string>

#include "common/config/remote_data_fetcher.h"
#include "envoy/common/exception.h"
#include "envoy/config/wasm/v3/wasm.pb.validate.h"
#include "envoy/event/deferred_deletable.h"
#include "envoy/grpc/status.h"
#include "envoy/http/codes.h"
#include "envoy/local_info/local_info.h"
Expand All @@ -21,6 +23,7 @@
#include "common/buffer/buffer_impl.h"
#include "common/common/assert.h"
#include "common/common/base64.h"
#include "common/config/remote_data_fetcher.h"
#include "common/common/empty_string.h"
#include "common/common/enum_to_int.h"
#include "common/common/logger.h"
Expand Down Expand Up @@ -75,6 +78,22 @@ struct CodeCacheEntry {
MonotonicTime fetch_time;
};

class RemoteDataFetcherAdapter : public Config::DataFetcher::RemoteDataFetcherCallback,
public Event::DeferredDeletable {
public:
RemoteDataFetcherAdapter(std::function<void(std::string cb)> cb) : cb_(cb) {}
~RemoteDataFetcherAdapter() = default;
void onSuccess(const std::string& data) override { cb_(data); }
virtual void onFailure(Config::DataFetcher::FailureReason) override { cb_(""); }
void setFetcher(std::unique_ptr<Config::DataFetcher::RemoteDataFetcher>&& fetcher) {
fetcher_ = std::move(fetcher);
}

private:
std::function<void(std::string)> cb_;
std::unique_ptr<Config::DataFetcher::RemoteDataFetcher> fetcher_;
};

std::atomic<int64_t> active_wasm_;
std::mutex code_cache_mutex;
std::unordered_map<std::string, CodeCacheEntry>* code_cache = nullptr;
Expand Down Expand Up @@ -592,7 +611,7 @@ createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin, Stats::Sco
it->second.use_time = now;
if (it->second.in_progress) {
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
"createWasm: failed to load (in prpgress) from {}", source);
"createWasm: failed to load (in progress) from {}", source);
throw WasmException(
fmt::format("Failed to load WASM code (fetch in progress) from {}", source));
}
Expand Down Expand Up @@ -664,12 +683,9 @@ createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin, Stats::Sco
};

if (fetch) {
// NB: if the (fetching) exception is thrown below, the remote_data provider will be deleted
// immediately rather than completing the async fetch, so allow for self-delete.
auto remote_data_provider_holder =
std::make_shared<std::unique_ptr<Config::DataSource::RemoteAsyncDataProvider>>();
auto holder = std::make_shared<std::unique_ptr<Event::DeferredDeletable>>();
auto fetch_callback = [vm_config, complete_cb, source, &dispatcher,
remote_data_provider_holder](const std::string& code) {
holder](const std::string& code) {
{
std::lock_guard<std::mutex> guard(code_cache_mutex);
auto& e = (*code_cache)[vm_config.code().remote().sha256()];
Expand All @@ -684,16 +700,24 @@ createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin, Stats::Sco
complete_cb(code);
}
// NB: must be deleted explicitly.
dispatcher.deferredDelete(
Envoy::Event::DeferredDeletablePtr{remote_data_provider_holder->release()});
remote_data_provider_holder->reset();
if (*holder) {
dispatcher.deferredDelete(Envoy::Event::DeferredDeletablePtr{holder->release()});
}
};
remote_data_provider = std::make_unique<Config::DataSource::RemoteAsyncDataProvider>(
cluster_manager, init_manager, vm_config.code().remote(), dispatcher, random, true,
fetch_callback);
if (fail_if_code_not_cached) {
*remote_data_provider_holder = std::move(remote_data_provider);
auto adapter = std::make_unique<RemoteDataFetcherAdapter>(fetch_callback);
auto fetcher = std::make_unique<Config::DataFetcher::RemoteDataFetcher>(
cluster_manager, vm_config.code().remote().http_uri(), vm_config.code().remote().sha256(),
*adapter);
auto fetcher_ptr = fetcher.get();
adapter->setFetcher(std::move(fetcher));
*holder = std::move(adapter);
fetcher_ptr->fetch();
throw WasmException(fmt::format("Failed to load WASM code (fetching) from {}", source));
} else {
remote_data_provider = std::make_unique<Config::DataSource::RemoteAsyncDataProvider>(
cluster_manager, init_manager, vm_config.code().remote(), dispatcher, random, true,
fetch_callback);
}
} else {
complete_cb(code);
Expand Down

0 comments on commit 0e2c9c0

Please sign in to comment.