Skip to content

Commit

Permalink
xDS: gRPC connection failure shouldn't make Envoy continue startup (#…
Browse files Browse the repository at this point in the history
…8152)

Currently, if gRPC config stream disconnected while Envoy waiting for
initial xDS response, xDS implementations' onConfigUpdateFailed() will
allow Envoy startup to continue. This may cause Envoy begins taking
traffics while route/cluster/endpoint config are still missing and
return "404 NR" or "503 NR".

This change makes Envoy waiting for initial xDS response until
initial_fetch_timeout if specified.

Risk Level: Medium
Testing: existing test cases updated
Fixes #8046

Signed-off-by: lhuang8 <lhuang8@ebay.com>
  • Loading branch information
l8huang authored and htuch committed Sep 19, 2019
1 parent 272ee70 commit d42e14e
Show file tree
Hide file tree
Showing 26 changed files with 110 additions and 64 deletions.
5 changes: 2 additions & 3 deletions source/common/config/delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,10 @@ void DeltaSubscriptionState::handleBadResponse(const EnvoyException& e, UpdateAc
}

void DeltaSubscriptionState::handleEstablishmentFailure() {
disableInitFetchTimeoutTimer();
// New gRPC stream will be established and send requests again.
// If init_fetch_timeout is non-zero, server will continue startup after it timeout
stats_.update_failure_.inc();
stats_.update_attempt_.inc();
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
nullptr);
}

envoy::api::v2::DeltaDiscoveryRequest DeltaSubscriptionState::getNextRequest() {
Expand Down
7 changes: 7 additions & 0 deletions source/common/config/grpc_mux_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,14 @@ void GrpcMuxSubscriptionImpl::onConfigUpdateFailed(ConfigUpdateFailureReason rea
ENVOY_LOG(warn, "gRPC config for {} rejected: {}", type_url_, e->what());
break;
}

stats_.update_attempt_.inc();
if (reason == Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure) {
// New gRPC stream will be established and send requests again.
// If init_fetch_timeout is non-zero, server will continue startup after it timeout
return;
}

callbacks_.onConfigUpdateFailed(reason, e);
}

Expand Down
50 changes: 34 additions & 16 deletions source/common/config/http_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ HttpSubscriptionImpl::HttpSubscriptionImpl(
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
ENVOY_LOG(warn, "REST config: initial fetch timed out for", path_);
stats_.init_fetch_timeout_.inc();
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout,
nullptr);
handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
});
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}
Expand Down Expand Up @@ -77,8 +74,7 @@ void HttpSubscriptionImpl::parseResponse(const Http::Message& response) {
try {
MessageUtil::loadFromJson(response.bodyAsString(), message, validation_visitor_);
} catch (const EnvoyException& e) {
ENVOY_LOG(warn, "REST config JSON conversion error: {}", e.what());
handleFailure(nullptr);
handleFailure(Config::ConfigUpdateFailureReason::UpdateRejected, &e);
return;
}
try {
Expand All @@ -87,23 +83,45 @@ void HttpSubscriptionImpl::parseResponse(const Http::Message& response) {
stats_.version_.set(HashUtil::xxHash64(request_.version_info()));
stats_.update_success_.inc();
} catch (const EnvoyException& e) {
ENVOY_LOG(warn, "REST config update rejected: {}", e.what());
stats_.update_rejected_.inc();
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
handleFailure(Config::ConfigUpdateFailureReason::UpdateRejected, &e);
}
}

void HttpSubscriptionImpl::onFetchComplete() {}

void HttpSubscriptionImpl::onFetchFailure(const EnvoyException* e) {
disableInitFetchTimeoutTimer();
ENVOY_LOG(warn, "REST config update failed: {}", e != nullptr ? e->what() : "fetch failure");
handleFailure(e);
void HttpSubscriptionImpl::onFetchFailure(Config::ConfigUpdateFailureReason reason,
const EnvoyException* e) {
handleFailure(reason, e);
}

void HttpSubscriptionImpl::handleFailure(const EnvoyException* e) {
stats_.update_failure_.inc();
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, e);
void HttpSubscriptionImpl::handleFailure(Config::ConfigUpdateFailureReason reason,
const EnvoyException* e) {

switch (reason) {
case Config::ConfigUpdateFailureReason::ConnectionFailure:
ENVOY_LOG(warn, "REST update for {} failed", path_);
stats_.update_failure_.inc();
break;
case Config::ConfigUpdateFailureReason::FetchTimedout:
ENVOY_LOG(warn, "REST config: initial fetch timeout for {}", path_);
stats_.init_fetch_timeout_.inc();
disableInitFetchTimeoutTimer();
break;
case Config::ConfigUpdateFailureReason::UpdateRejected:
ASSERT(e != nullptr);
ENVOY_LOG(warn, "REST config for {} rejected: {}", path_, e->what());
stats_.update_rejected_.inc();
disableInitFetchTimeoutTimer();
break;
}

if (reason == Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure) {
// New requests will be sent again.
// If init_fetch_timeout is non-zero, server will continue startup after it timeout
return;
}

callbacks_.onConfigUpdateFailed(reason, e);
}

void HttpSubscriptionImpl::disableInitFetchTimeoutTimer() {
Expand Down
4 changes: 2 additions & 2 deletions source/common/config/http_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
void createRequest(Http::Message& request) override;
void parseResponse(const Http::Message& response) override;
void onFetchComplete() override;
void onFetchFailure(const EnvoyException* e) override;
void onFetchFailure(Config::ConfigUpdateFailureReason reason, const EnvoyException* e) override;

private:
void handleFailure(const EnvoyException* e);
void handleFailure(Config::ConfigUpdateFailureReason reason, const EnvoyException* e);
void disableInitFetchTimeoutTimer();

std::string path_;
Expand Down
8 changes: 5 additions & 3 deletions source/common/http/rest_api_fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ void RestApiFetcher::onSuccess(Http::MessagePtr&& response) {
try {
parseResponse(*response);
} catch (EnvoyException& e) {
onFetchFailure(&e);
onFetchFailure(Config::ConfigUpdateFailureReason::UpdateRejected, &e);
}

requestComplete();
}

void RestApiFetcher::onFailure(Http::AsyncClient::FailureReason) {
onFetchFailure(nullptr);
void RestApiFetcher::onFailure(Http::AsyncClient::FailureReason reason) {
// Currently Http::AsyncClient::FailureReason only has one value: "Reset".
ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
onFetchFailure(Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr);
requestComplete();
}

Expand Down
5 changes: 4 additions & 1 deletion source/common/http/rest_api_fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <chrono>
#include <string>

#include "envoy/config/subscription.h"
#include "envoy/event/dispatcher.h"
#include "envoy/runtime/runtime.h"
#include "envoy/upstream/cluster_manager.h"
Expand Down Expand Up @@ -46,9 +47,11 @@ class RestApiFetcher : public Http::AsyncClient::Callbacks {

/**
* This will be called if the fetch fails (either due to non-200 response, network error, etc.).
* @param reason supplies the fetch failure reason.
* @param e supplies any exception data on why the fetch failed. May be nullptr.
*/
virtual void onFetchFailure(const EnvoyException* e) PURE;
virtual void onFetchFailure(Config::ConfigUpdateFailureReason reason,
const EnvoyException* e) PURE;

protected:
const std::string remote_cluster_name_;
Expand Down
5 changes: 3 additions & 2 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ void RdsRouteConfigSubscription::onConfigUpdate(
}
}

void RdsRouteConfigSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
const EnvoyException*) {
void RdsRouteConfigSubscription::onConfigUpdateFailed(
Envoy::Config::ConfigUpdateFailureReason reason, const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
init_target_.ready();
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/scoped_rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ class ScopedRdsConfigSubscription : public Envoy::Config::DeltaConfigSubscriptio
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& version_info) override;
void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) override {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
DeltaConfigSubscriptionInstance::onConfigUpdateFailed();
}
std::string resourceName(const ProtobufWkt::Any& resource) override {
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/vhds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ VhdsSubscription::VhdsSubscription(RouteConfigUpdatePtr& config_update_info,
*scope_, *this);
}

void VhdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
void VhdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
init_target_.ready();
Expand Down
3 changes: 2 additions & 1 deletion source/common/runtime/runtime_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,9 @@ void RtdsSubscription::onConfigUpdate(
onConfigUpdate(unwrapped_resource, resources[0].version());
}

void RtdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
void RtdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
init_target_.ready();
Expand Down
4 changes: 3 additions & 1 deletion source/common/secret/sds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ void SdsApi::onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Res
onConfigUpdate(unwrapped_resource, resources[0].version());
}

void SdsApi::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason, const EnvoyException*) {
void SdsApi::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad config.
init_target_.ready();
}
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ void CdsApiImpl::onConfigUpdate(
}
}

void CdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
void CdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
runInitializeCallbackIfAny();
Expand Down
6 changes: 1 addition & 5 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,7 @@ bool EdsClusterImpl::updateHostsPerLocality(

void EdsClusterImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
// We should not call onPreInitComplete if this method is called because of stream disconnection.
// This might potentially hang the initialization forever, if init_fetch_timeout is disabled.
if (reason == Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure) {
return;
}
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad config.
onPreInitComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ void ClientSslAuthConfig::parseResponse(const Http::Message& message) {
stats_.total_principals_.set(new_principals->size());
}

void ClientSslAuthConfig::onFetchFailure(const EnvoyException*) { stats_.update_failure_.inc(); }
void ClientSslAuthConfig::onFetchFailure(Config::ConfigUpdateFailureReason, const EnvoyException*) {
stats_.update_failure_.inc();
}

static const std::string Path = "/v1/certs/list/approved";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <unordered_set>

#include "envoy/config/filter/network/client_ssl_auth/v2/client_ssl_auth.pb.h"
#include "envoy/config/subscription.h"
#include "envoy/network/filter.h"
#include "envoy/runtime/runtime.h"
#include "envoy/stats/scope.h"
Expand Down Expand Up @@ -94,7 +95,7 @@ class ClientSslAuthConfig : public Http::RestApiFetcher {
void createRequest(Http::Message& request) override;
void parseResponse(const Http::Message& response) override;
void onFetchComplete() override {}
void onFetchFailure(const EnvoyException* e) override;
void onFetchFailure(Config::ConfigUpdateFailureReason reason, const EnvoyException* e) override;

ThreadLocal::SlotPtr tls_;
Network::Address::IpList ip_white_list_;
Expand Down
3 changes: 2 additions & 1 deletion source/server/lds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ void LdsApiImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::An
onConfigUpdate(to_add_repeated, to_remove_repeated, version_info);
}

void LdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
void LdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
init_target_.ready();
Expand Down
8 changes: 8 additions & 0 deletions test/common/config/delta_subscription_state_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,14 @@ TEST_F(DeltaSubscriptionStateTest, AddedAndRemoved) {
ack.error_detail_.message());
}

TEST_F(DeltaSubscriptionStateTest, handleEstablishmentFailure) {
EXPECT_CALL(callbacks_, onConfigUpdateFailed(_, _)).Times(0);

state_.handleEstablishmentFailure();
EXPECT_EQ(stats_.update_failure_.value(), 1);
EXPECT_EQ(stats_.update_attempt_.value(), 1);
}

} // namespace
} // namespace Config
} // namespace Envoy
8 changes: 6 additions & 2 deletions test/common/config/grpc_subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ TEST_F(GrpcSubscriptionImplTest, StreamCreationFailure) {
InSequence s;
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(nullptr));

// onConfigUpdateFailed() should not be called for gRPC stream connection failure
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _))
.Times(0);
EXPECT_CALL(random_, random());
EXPECT_CALL(*timer_, enableTimer(_, _));
subscription_->start({"cluster0", "cluster1"});
Expand All @@ -38,8 +40,10 @@ TEST_F(GrpcSubscriptionImplTest, StreamCreationFailure) {
TEST_F(GrpcSubscriptionImplTest, RemoteStreamClose) {
startSubscription({"cluster0", "cluster1"});
EXPECT_TRUE(statsAre(1, 0, 0, 0, 0, 0));
// onConfigUpdateFailed() should not be called for gRPC stream connection failure
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _))
.Times(0);
EXPECT_CALL(*timer_, enableTimer(_, _));
EXPECT_CALL(random_, random());
subscription_->grpcMux().grpcStreamForTest().onRemoteClose(Grpc::Status::GrpcStatus::Canceled,
Expand Down
11 changes: 6 additions & 5 deletions test/common/config/http_subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ TEST_F(HttpSubscriptionImplTest, OnRequestReset) {
EXPECT_CALL(random_gen_, random()).WillOnce(Return(0));
EXPECT_CALL(*timer_, enableTimer(_, _));
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _))
.Times(0);
http_callbacks_->onFailure(Http::AsyncClient::FailureReason::Reset);
EXPECT_TRUE(statsAre(1, 0, 0, 1, 0, 0));
timerTick();
Expand All @@ -34,14 +35,14 @@ TEST_F(HttpSubscriptionImplTest, BadJsonRecovery) {
EXPECT_CALL(random_gen_, random()).WillOnce(Return(0));
EXPECT_CALL(*timer_, enableTimer(_, _));
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, _));
http_callbacks_->onSuccess(std::move(message));
EXPECT_TRUE(statsAre(1, 0, 0, 1, 0, 0));
EXPECT_TRUE(statsAre(1, 0, 1, 0, 0, 0));
request_in_progress_ = false;
timerTick();
EXPECT_TRUE(statsAre(2, 0, 0, 1, 0, 0));
EXPECT_TRUE(statsAre(2, 0, 1, 0, 0, 0));
deliverConfigUpdate({"cluster0", "cluster1"}, "0", true);
EXPECT_TRUE(statsAre(3, 1, 0, 1, 0, 7148434200721666028));
EXPECT_TRUE(statsAre(3, 1, 1, 0, 0, 7148434200721666028));
}

TEST_F(HttpSubscriptionImplTest, ConfigNotModified) {
Expand Down
3 changes: 2 additions & 1 deletion test/common/config/subscription_factory_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ TEST_F(SubscriptionFactoryTest, GrpcSubscription) {
}));
EXPECT_CALL(random_, random());
EXPECT_CALL(dispatcher_, createTimer_(_)).Times(2);
EXPECT_CALL(callbacks_, onConfigUpdateFailed(_, _));
// onConfigUpdateFailed() should not be called for gRPC stream connection failure
EXPECT_CALL(callbacks_, onConfigUpdateFailed(_, _)).Times(0);
subscriptionFromConfigSource(config)->start({"static_cluster"});
}

Expand Down
4 changes: 4 additions & 0 deletions test/common/config/subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ TEST_P(SubscriptionImplInitFetchTimeoutTest, InitialFetchTimeout) {
expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds(1000));
startSubscription({"cluster0", "cluster1"});
statsAre(1, 0, 0, 0, 0, 0);
if (GetParam() == SubscriptionType::Http) {
expectDisableInitFetchTimeoutTimer();
}
expectConfigUpdateFailed();

callInitFetchTimeoutCb();
statsAre(1, 0, 0, 0, 1, 0);
}
Expand Down
4 changes: 2 additions & 2 deletions test/common/router/rds_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ TEST_F(RdsImplTest, FailureSubscription) {
setup();

EXPECT_CALL(init_watcher_, ready());
rds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
{});
// onConfigUpdateFailed() should not be called for gRPC stream connection failure
rds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, {});
}

class RouteConfigProviderManagerImplTest : public RdsTestBase {
Expand Down
5 changes: 3 additions & 2 deletions test/common/runtime/runtime_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,9 @@ TEST_F(RtdsLoaderImplTest, FailureSubscription) {
setup();

EXPECT_CALL(init_watcher_, ready());
rtds_callbacks_[0]->onConfigUpdateFailed(
Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, {});
// onConfigUpdateFailed() should not be called for gRPC stream connection failure
rtds_callbacks_[0]->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout,
{});

EXPECT_EQ(0, store_.counter("runtime.load_error").value());
EXPECT_EQ(1, store_.counter("runtime.load_success").value());
Expand Down
Loading

0 comments on commit d42e14e

Please sign in to comment.