diff --git a/include/envoy/config/BUILD b/include/envoy/config/BUILD index 2b2504620c99..001d3cbe46af 100644 --- a/include/envoy/config/BUILD +++ b/include/envoy/config/BUILD @@ -66,6 +66,14 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "watch_map_interface", + hdrs = ["watch_map.h"], + deps = [ + ":subscription_interface", + ], +) + envoy_cc_library( name = "xds_grpc_context_interface", hdrs = ["xds_grpc_context.h"], diff --git a/include/envoy/config/grpc_mux.h b/include/envoy/config/grpc_mux.h index 4888e4e7e716..1d9920e2d5e3 100644 --- a/include/envoy/config/grpc_mux.h +++ b/include/envoy/config/grpc_mux.h @@ -25,6 +25,7 @@ struct ControlPlaneStats { ALL_CONTROL_PLANE_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) }; +// TODO(fredlas) redundant to SubscriptionCallbacks; remove this one. class GrpcMuxCallbacks { public: virtual ~GrpcMuxCallbacks() = default; diff --git a/source/common/config/BUILD b/source/common/config/BUILD index 378512d4df5f..f3bcaacfdbb7 100644 --- a/source/common/config/BUILD +++ b/source/common/config/BUILD @@ -32,18 +32,20 @@ envoy_cc_library( ) envoy_cc_library( - name = "filesystem_subscription_lib", - srcs = ["filesystem_subscription_impl.cc"], - hdrs = ["filesystem_subscription_impl.h"], + name = "config_provider_lib", + srcs = ["config_provider_impl.cc"], + hdrs = ["config_provider_impl.h"], deps = [ - "//include/envoy/config:subscription_interface", - "//include/envoy/event:dispatcher_interface", - "//include/envoy/filesystem:filesystem_interface", - "//source/common/common:minimal_logger_lib", - "//source/common/config:utility_lib", + ":utility_lib", + "//include/envoy/config:config_provider_interface", + "//include/envoy/config:config_provider_manager_interface", + "//include/envoy/init:manager_interface", + "//include/envoy/server:admin_interface", + "//include/envoy/server:config_tracker_interface", + "//include/envoy/singleton:instance_interface", + "//include/envoy/thread_local:thread_local_interface", + "//source/common/init:target_lib", "//source/common/protobuf", - "//source/common/protobuf:message_validator_lib", - "//source/common/protobuf:utility_lib", ], ) @@ -100,6 +102,22 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "filesystem_subscription_lib", + srcs = ["filesystem_subscription_impl.cc"], + hdrs = ["filesystem_subscription_impl.h"], + deps = [ + "//include/envoy/config:subscription_interface", + "//include/envoy/event:dispatcher_interface", + "//include/envoy/filesystem:filesystem_interface", + "//source/common/common:minimal_logger_lib", + "//source/common/config:utility_lib", + "//source/common/protobuf", + "//source/common/protobuf:message_validator_lib", + "//source/common/protobuf:utility_lib", + ], +) + envoy_cc_library( name = "grpc_stream_lib", hdrs = ["grpc_stream.h"], @@ -265,12 +283,6 @@ envoy_cc_library( ], ) -envoy_cc_library( - name = "resources_lib", - hdrs = ["resources.h"], - deps = ["//source/common/singleton:const_singleton"], -) - envoy_cc_library( name = "rds_json_lib", srcs = ["rds_json.cc"], @@ -288,6 +300,25 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "resources_lib", + hdrs = ["resources.h"], + deps = ["//source/common/singleton:const_singleton"], +) + +envoy_cc_library( + name = "remote_data_fetcher_lib", + srcs = ["remote_data_fetcher.cc"], + hdrs = ["remote_data_fetcher.h"], + deps = [ + "//include/envoy/upstream:cluster_manager_interface", + "//source/common/common:hex_lib", + "//source/common/crypto:utility_lib", + "//source/common/http:utility_lib", + "@envoy_api//envoy/api/v2/core:http_uri_cc", + ], +) + envoy_cc_library( name = "runtime_utility_lib", srcs = ["runtime_utility.cc"], @@ -361,42 +392,23 @@ envoy_cc_library( ) envoy_cc_library( - name = "well_known_names", - srcs = ["well_known_names.cc"], - hdrs = ["well_known_names.h"], + name = "watch_map_lib", + srcs = ["watch_map.cc"], + hdrs = ["watch_map.h"], deps = [ + "//include/envoy/config:subscription_interface", "//source/common/common:assert_lib", - "//source/common/singleton:const_singleton", - ], -) - -envoy_cc_library( - name = "config_provider_lib", - srcs = ["config_provider_impl.cc"], - hdrs = ["config_provider_impl.h"], - deps = [ - ":utility_lib", - "//include/envoy/config:config_provider_interface", - "//include/envoy/config:config_provider_manager_interface", - "//include/envoy/init:manager_interface", - "//include/envoy/server:admin_interface", - "//include/envoy/server:config_tracker_interface", - "//include/envoy/singleton:instance_interface", - "//include/envoy/thread_local:thread_local_interface", - "//source/common/init:target_lib", + "//source/common/common:minimal_logger_lib", "//source/common/protobuf", ], ) envoy_cc_library( - name = "remote_data_fetcher_lib", - srcs = ["remote_data_fetcher.cc"], - hdrs = ["remote_data_fetcher.h"], + name = "well_known_names", + srcs = ["well_known_names.cc"], + hdrs = ["well_known_names.h"], deps = [ - "//include/envoy/upstream:cluster_manager_interface", - "//source/common/common:hex_lib", - "//source/common/crypto:utility_lib", - "//source/common/http:utility_lib", - "@envoy_api//envoy/api/v2/core:http_uri_cc", + "//source/common/common:assert_lib", + "//source/common/singleton:const_singleton", ], ) diff --git a/source/common/config/watch_map.cc b/source/common/config/watch_map.cc new file mode 100644 index 000000000000..718231009101 --- /dev/null +++ b/source/common/config/watch_map.cc @@ -0,0 +1,170 @@ +#include "common/config/watch_map.h" + +namespace Envoy { +namespace Config { + +Watch* WatchMap::addWatch(SubscriptionCallbacks& callbacks) { + auto watch = std::make_unique(callbacks); + Watch* watch_ptr = watch.get(); + wildcard_watches_.insert(watch_ptr); + watches_.insert(std::move(watch)); + return watch_ptr; +} + +void WatchMap::removeWatch(Watch* watch) { + wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone. + watches_.erase(watch); +} + +AddedRemoved WatchMap::updateWatchInterest(Watch* watch, + const std::set& update_to_these_names) { + if (update_to_these_names.empty()) { + wildcard_watches_.insert(watch); + } else { + wildcard_watches_.erase(watch); + } + + std::vector newly_added_to_watch; + std::set_difference(update_to_these_names.begin(), update_to_these_names.end(), + watch->resource_names_.begin(), watch->resource_names_.end(), + std::inserter(newly_added_to_watch, newly_added_to_watch.begin())); + + std::vector newly_removed_from_watch; + std::set_difference(watch->resource_names_.begin(), watch->resource_names_.end(), + update_to_these_names.begin(), update_to_these_names.end(), + std::inserter(newly_removed_from_watch, newly_removed_from_watch.begin())); + + watch->resource_names_ = update_to_these_names; + + return AddedRemoved(findAdditions(newly_added_to_watch, watch), + findRemovals(newly_removed_from_watch, watch)); +} + +absl::flat_hash_set WatchMap::watchesInterestedIn(const std::string& resource_name) { + absl::flat_hash_set ret = wildcard_watches_; + auto watches_interested = watch_interest_.find(resource_name); + if (watches_interested != watch_interest_.end()) { + for (const auto& watch : watches_interested->second) { + ret.insert(watch); + } + } + return ret; +} + +void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField& resources, + const std::string& version_info) { + if (watches_.empty()) { + ENVOY_LOG(warn, "WatchMap::onConfigUpdate: there are no watches!"); + return; + } + SubscriptionCallbacks& name_getter = (*watches_.begin())->callbacks_; + + // Build a map from watches, to the set of updated resources that each watch cares about. Each + // entry in the map is then a nice little bundle that can be fed directly into the individual + // onConfigUpdate()s. + absl::flat_hash_map> per_watch_updates; + for (const auto& r : resources) { + const absl::flat_hash_set& interested_in_r = + watchesInterestedIn(name_getter.resourceName(r)); + for (const auto& interested_watch : interested_in_r) { + per_watch_updates[interested_watch].Add()->CopyFrom(r); + } + } + + // We just bundled up the updates into nice per-watch packages. Now, deliver them. + for (auto& watch : watches_) { + auto this_watch_updates = per_watch_updates.find(watch); + if (this_watch_updates == per_watch_updates.end()) { + // This update included no resources this watch cares about - so we do an empty + // onConfigUpdate(), to notify the watch that its resources - if they existed before this - + // were dropped. + watch->callbacks_.onConfigUpdate({}, version_info); + } else { + watch->callbacks_.onConfigUpdate(this_watch_updates->second, version_info); + } + } +} + +void WatchMap::onConfigUpdate( + const Protobuf::RepeatedPtrField& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info) { + // Build a pair of maps: from watches, to the set of resources {added,removed} that each watch + // cares about. Each entry in the map-pair is then a nice little bundle that can be fed directly + // into the individual onConfigUpdate()s. + absl::flat_hash_map> per_watch_added; + for (const auto& r : added_resources) { + const absl::flat_hash_set& interested_in_r = watchesInterestedIn(r.name()); + for (const auto& interested_watch : interested_in_r) { + per_watch_added[interested_watch].Add()->CopyFrom(r); + } + } + absl::flat_hash_map> per_watch_removed; + for (const auto& r : removed_resources) { + const absl::flat_hash_set& interested_in_r = watchesInterestedIn(r); + for (const auto& interested_watch : interested_in_r) { + *per_watch_removed[interested_watch].Add() = r; + } + } + + // We just bundled up the updates into nice per-watch packages. Now, deliver them. + for (const auto& added : per_watch_added) { + const Watch* cur_watch = added.first; + auto removed = per_watch_removed.find(cur_watch); + if (removed == per_watch_removed.end()) { + // additions only, no removals + cur_watch->callbacks_.onConfigUpdate(added.second, {}, system_version_info); + } else { + // both additions and removals + cur_watch->callbacks_.onConfigUpdate(added.second, removed->second, system_version_info); + // Drop the removals now, so the final removals-only pass won't use them. + per_watch_removed.erase(removed); + } + } + // Any removals-only updates will not have been picked up in the per_watch_added loop. + for (auto& removed : per_watch_removed) { + removed.first->callbacks_.onConfigUpdate({}, removed.second, system_version_info); + } +} + +void WatchMap::onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) { + for (auto& watch : watches_) { + watch->callbacks_.onConfigUpdateFailed(reason, e); + } +} + +std::set WatchMap::findAdditions(const std::vector& newly_added_to_watch, + Watch* watch) { + std::set newly_added_to_subscription; + for (const auto& name : newly_added_to_watch) { + auto entry = watch_interest_.find(name); + if (entry == watch_interest_.end()) { + newly_added_to_subscription.insert(name); + watch_interest_[name] = {watch}; + } else { + entry->second.insert(watch); + } + } + return newly_added_to_subscription; +} + +std::set +WatchMap::findRemovals(const std::vector& newly_removed_from_watch, Watch* watch) { + std::set newly_removed_from_subscription; + for (const auto& name : newly_removed_from_watch) { + auto entry = watch_interest_.find(name); + RELEASE_ASSERT( + entry != watch_interest_.end(), + fmt::format("WatchMap: tried to remove a watch from untracked resource {}", name)); + + entry->second.erase(watch); + if (entry->second.empty()) { + watch_interest_.erase(entry); + newly_removed_from_subscription.insert(name); + } + } + return newly_removed_from_subscription; +} + +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/watch_map.h b/source/common/config/watch_map.h new file mode 100644 index 000000000000..5e75e5e88dd7 --- /dev/null +++ b/source/common/config/watch_map.h @@ -0,0 +1,115 @@ +#pragma once + +#include +#include +#include + +#include "envoy/config/subscription.h" + +#include "common/common/assert.h" +#include "common/common/logger.h" + +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" + +namespace Envoy { +namespace Config { + +struct AddedRemoved { + AddedRemoved(std::set&& added, std::set&& removed) + : added_(std::move(added)), removed_(std::move(removed)) {} + std::set added_; + std::set removed_; +}; + +struct Watch { + Watch(SubscriptionCallbacks& callbacks) : callbacks_(callbacks) {} + SubscriptionCallbacks& callbacks_; + std::set resource_names_; // must be sorted set, for set_difference. +}; + +// NOTE: Users are responsible for eventually calling removeWatch() on the Watch* returned +// by addWatch(). We don't expect there to be new users of this class beyond +// NewGrpcMuxImpl and DeltaSubscriptionImpl (TODO(fredlas) to be renamed). +// +// Manages "watches" of xDS resources. Several xDS callers might ask for a subscription to the same +// resource name "X". The xDS machinery must return to each their very own subscription to X. +// The xDS machinery's "watch" concept accomplishes that, while avoiding parallel redundant xDS +// requests for X. Each of those subscriptions is viewed as a "watch" on X, while behind the scenes +// there is just a single real subscription to that resource name. +// +// This class maintains the watches<-->subscription mapping: it +// 1) delivers updates to all interested watches, and +// 2) tracks which resource names should be {added to,removed from} the subscription when the +// {first,last} watch on a resource name is {added,removed}. +// +// #1 is accomplished by WatchMap's implementation of the SubscriptionCallbacks interface. +// This interface allows the xDS client to just throw each xDS update message it receives directly +// into WatchMap::onConfigUpdate, rather than having to track the various watches' callbacks. +// +// The information for #2 is returned by updateWatchInterest(); the caller should use it to +// update the subscription accordingly. +// +// A WatchMap is assumed to be dedicated to a single type_url type of resource (EDS, CDS, etc). +class WatchMap : public SubscriptionCallbacks, public Logger::Loggable { +public: + WatchMap() = default; + + // Adds 'callbacks' to the WatchMap, with every possible resource being watched. + // (Use updateWatchInterest() to narrow it down to some specific names). + // Returns the newly added watch, to be used with updateWatchInterest and removeWatch. + Watch* addWatch(SubscriptionCallbacks& callbacks); + + // Updates the set of resource names that the given watch should watch. + // Returns any resource name additions/removals that are unique across all watches. That is: + // 1) if 'resources' contains X and no other watch cares about X, X will be in added_. + // 2) if 'resources' does not contain Y, and this watch was the only one that cared about Y, + // Y will be in removed_. + AddedRemoved updateWatchInterest(Watch* watch, + const std::set& update_to_these_names); + + // Expects that the watch to be removed has already had all of its resource names removed via + // updateWatchInterest(). + void removeWatch(Watch* watch); + + // SubscriptionCallbacks + void onConfigUpdate(const Protobuf::RepeatedPtrField& resources, + const std::string& version_info) override; + void onConfigUpdate(const Protobuf::RepeatedPtrField& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info) override; + + void onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) override; + + std::string resourceName(const ProtobufWkt::Any&) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + + WatchMap(const WatchMap&) = delete; + WatchMap& operator=(const WatchMap&) = delete; + +private: + // Given a list of names that are new to an individual watch, returns those names that are in fact + // new to the entire subscription. + std::set findAdditions(const std::vector& newly_added_to_watch, + Watch* watch); + + // Given a list of names that an individual watch no longer cares about, returns those names that + // in fact the entire subscription no longer cares about. + std::set findRemovals(const std::vector& newly_removed_from_watch, + Watch* watch); + + // Returns the union of watch_interest_[resource_name] and wildcard_watches_. + absl::flat_hash_set watchesInterestedIn(const std::string& resource_name); + + absl::flat_hash_set> watches_; + + // Watches whose interest set is currently empty, which is interpreted as "everything". + absl::flat_hash_set wildcard_watches_; + + // Maps a resource name to the set of watches interested in that resource. Has two purposes: + // 1) Acts as a reference count; no watches care anymore ==> the resource can be removed. + // 2) Enables efficient lookup of all interested watches when a resource has been updated. + absl::flat_hash_map> watch_interest_; +}; + +} // namespace Config +} // namespace Envoy diff --git a/source/docs/xDS_code_diagram.png b/source/docs/xDS_code_diagram.png new file mode 100644 index 000000000000..ef4df79cc1e3 Binary files /dev/null and b/source/docs/xDS_code_diagram.png differ diff --git a/test/common/config/BUILD b/test/common/config/BUILD index 509d423919a9..60101fa4aa22 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -274,6 +274,17 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "watch_map_test", + srcs = ["watch_map_test.cc"], + deps = [ + "//source/common/config:watch_map_lib", + "//test/mocks/config:config_mocks", + "//test/test_common:utility_lib", + "@envoy_api//envoy/api/v2:eds_cc", + ], +) + envoy_cc_test( name = "filter_json_test", srcs = ["filter_json_test.cc"], diff --git a/test/common/config/watch_map_test.cc b/test/common/config/watch_map_test.cc new file mode 100644 index 000000000000..543298557fab --- /dev/null +++ b/test/common/config/watch_map_test.cc @@ -0,0 +1,397 @@ +#include + +#include "envoy/api/v2/eds.pb.h" +#include "envoy/common/exception.h" +#include "envoy/stats/scope.h" + +#include "common/config/watch_map.h" + +#include "test/mocks/config/mocks.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using ::testing::_; +using ::testing::Invoke; + +namespace Envoy { +namespace Config { +namespace { + +class NamedMockSubscriptionCallbacks + : public MockSubscriptionCallbacks { +public: + std::string resourceName(const ProtobufWkt::Any& resource) override { + return TestUtility::anyConvert(resource).cluster_name(); + } +}; + +// expectDeltaAndSotwUpdate() EXPECTs two birds with one function call: we want to cover both SotW +// and delta, which, while mechanically different, can behave identically for our testing purposes. +// Specifically, as a simplification for these tests, every still-present resource is updated in +// every update. Therefore, a resource can never show up in the SotW update but not the delta +// update. We can therefore use the same expected_resources for both. +void expectDeltaAndSotwUpdate( + NamedMockSubscriptionCallbacks& callbacks, + const std::vector& expected_resources, + const std::vector& expected_removals, const std::string& version) { + EXPECT_CALL(callbacks, onConfigUpdate(_, version)) + .WillOnce(Invoke( + [expected_resources](const Protobuf::RepeatedPtrField& gotten_resources, + const std::string&) { + EXPECT_EQ(expected_resources.size(), gotten_resources.size()); + for (size_t i = 0; i < expected_resources.size(); i++) { + envoy::api::v2::ClusterLoadAssignment cur_gotten_resource; + gotten_resources[i].UnpackTo(&cur_gotten_resource); + EXPECT_TRUE(TestUtility::protoEqual(cur_gotten_resource, expected_resources[i])); + } + })); + EXPECT_CALL(callbacks, onConfigUpdate(_, _, _)) + .WillOnce( + Invoke([expected_resources, expected_removals, version]( + const Protobuf::RepeatedPtrField& gotten_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string&) { + EXPECT_EQ(expected_resources.size(), gotten_resources.size()); + for (size_t i = 0; i < expected_resources.size(); i++) { + EXPECT_EQ(gotten_resources[i].version(), version); + envoy::api::v2::ClusterLoadAssignment cur_gotten_resource; + gotten_resources[i].resource().UnpackTo(&cur_gotten_resource); + EXPECT_TRUE(TestUtility::protoEqual(cur_gotten_resource, expected_resources[i])); + } + EXPECT_EQ(expected_removals.size(), removed_resources.size()); + for (size_t i = 0; i < expected_removals.size(); i++) { + EXPECT_EQ(expected_removals[i], removed_resources[i]); + } + })); +} + +// Sometimes we want to verify that a delta onConfigUpdate simply doesn't happen. However, for SotW, +// every update triggers all onConfigUpdate()s, so we should still expect empty calls for that. +void expectNoDeltaUpdate(NamedMockSubscriptionCallbacks& callbacks, const std::string& version) { + EXPECT_CALL(callbacks, onConfigUpdate(_, version)) + .WillOnce(Invoke([](const Protobuf::RepeatedPtrField& gotten_resources, + const std::string&) { EXPECT_EQ(0, gotten_resources.size()); })); + EXPECT_CALL(callbacks, onConfigUpdate(_, _, _)).Times(0); +} + +Protobuf::RepeatedPtrField +wrapInResource(const Protobuf::RepeatedPtrField& anys, + const std::string& version) { + Protobuf::RepeatedPtrField ret; + for (const auto& a : anys) { + envoy::api::v2::ClusterLoadAssignment cur_endpoint; + a.UnpackTo(&cur_endpoint); + auto* cur_resource = ret.Add(); + cur_resource->set_name(cur_endpoint.cluster_name()); + cur_resource->mutable_resource()->CopyFrom(a); + cur_resource->set_version(version); + } + return ret; +} + +// Similar to expectDeltaAndSotwUpdate(), but making the onConfigUpdate() happen, rather than +// EXPECTing it. +void doDeltaAndSotwUpdate(SubscriptionCallbacks& watch_map, + const Protobuf::RepeatedPtrField& sotw_resources, + const std::vector& removed_names, + const std::string& version) { + watch_map.onConfigUpdate(sotw_resources, version); + + Protobuf::RepeatedPtrField delta_resources = + wrapInResource(sotw_resources, version); + Protobuf::RepeatedPtrField removed_names_proto; + for (const auto& n : removed_names) { + *removed_names_proto.Add() = n; + } + watch_map.onConfigUpdate(delta_resources, removed_names_proto, "version1"); +} + +// Tests the simple case of a single watch. Checks that the watch will not be told of updates to +// resources it doesn't care about. Checks that the watch can later decide it does care about them, +// and then receive subsequent updates to them. +TEST(WatchMapTest, Basic) { + NamedMockSubscriptionCallbacks callbacks; + WatchMap watch_map; + Watch* watch = watch_map.addWatch(callbacks); + + { + // The watch is interested in Alice and Bob... + std::set update_to({"alice", "bob"}); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch, update_to); + EXPECT_EQ(update_to, added_removed.added_); + EXPECT_TRUE(added_removed.removed_.empty()); + + // ...the update is going to contain Bob and Carol... + Protobuf::RepeatedPtrField updated_resources; + envoy::api::v2::ClusterLoadAssignment bob; + bob.set_cluster_name("bob"); + updated_resources.Add()->PackFrom(bob); + envoy::api::v2::ClusterLoadAssignment carol; + carol.set_cluster_name("carol"); + updated_resources.Add()->PackFrom(carol); + + // ...so the watch should receive only Bob. + std::vector expected_resources; + expected_resources.push_back(bob); + + expectDeltaAndSotwUpdate(callbacks, expected_resources, {}, "version1"); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version1"); + } + { + // The watch is now interested in Bob, Carol, Dave, Eve... + std::set update_to({"bob", "carol", "dave", "eve"}); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch, update_to); + EXPECT_EQ(std::set({"carol", "dave", "eve"}), added_removed.added_); + EXPECT_EQ(std::set({"alice"}), added_removed.removed_); + + // ...the update is going to contain Alice, Carol, Dave... + Protobuf::RepeatedPtrField updated_resources; + envoy::api::v2::ClusterLoadAssignment alice; + alice.set_cluster_name("alice"); + updated_resources.Add()->PackFrom(alice); + envoy::api::v2::ClusterLoadAssignment carol; + carol.set_cluster_name("carol"); + updated_resources.Add()->PackFrom(carol); + envoy::api::v2::ClusterLoadAssignment dave; + dave.set_cluster_name("dave"); + updated_resources.Add()->PackFrom(dave); + + // ...so the watch should receive only Carol and Dave. + std::vector expected_resources; + expected_resources.push_back(carol); + expected_resources.push_back(dave); + + expectDeltaAndSotwUpdate(callbacks, expected_resources, {"bob"}, "version2"); + doDeltaAndSotwUpdate(watch_map, updated_resources, {"bob"}, "version2"); + } +} + +// Checks the following: +// First watch on a resource name ==> updateWatchInterest() returns "add it to subscription" +// Second watch on that name ==> updateWatchInterest() returns nothing about that name +// Original watch loses interest ==> nothing +// Second watch also loses interest ==> "remove it from subscription" +// NOTE: we need the resource name "dummy" to keep either watch from ever having no names watched, +// which is treated as interest in all names. +TEST(WatchMapTest, Overlap) { + NamedMockSubscriptionCallbacks callbacks1; + NamedMockSubscriptionCallbacks callbacks2; + WatchMap watch_map; + Watch* watch1 = watch_map.addWatch(callbacks1); + Watch* watch2 = watch_map.addWatch(callbacks2); + + Protobuf::RepeatedPtrField updated_resources; + envoy::api::v2::ClusterLoadAssignment alice; + alice.set_cluster_name("alice"); + updated_resources.Add()->PackFrom(alice); + + // First watch becomes interested. + { + std::set update_to({"alice", "dummy"}); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch1, update_to); + EXPECT_EQ(update_to, added_removed.added_); // add to subscription + EXPECT_TRUE(added_removed.removed_.empty()); + watch_map.updateWatchInterest(watch2, {"dummy"}); + + // First watch receives update. + expectDeltaAndSotwUpdate(callbacks1, {alice}, {}, "version1"); + expectNoDeltaUpdate(callbacks2, "version1"); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version1"); + } + // Second watch becomes interested. + { + std::set update_to({"alice", "dummy"}); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch2, update_to); + EXPECT_TRUE(added_removed.added_.empty()); // nothing happens + EXPECT_TRUE(added_removed.removed_.empty()); + + // Both watches receive update. + expectDeltaAndSotwUpdate(callbacks1, {alice}, {}, "version2"); + expectDeltaAndSotwUpdate(callbacks2, {alice}, {}, "version2"); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version2"); + } + // First watch loses interest. + { + AddedRemoved added_removed = watch_map.updateWatchInterest(watch1, {"dummy"}); + EXPECT_TRUE(added_removed.added_.empty()); // nothing happens + EXPECT_TRUE(added_removed.removed_.empty()); + + // *Only* second watch receives update. + expectNoDeltaUpdate(callbacks1, "version3"); + expectDeltaAndSotwUpdate(callbacks2, {alice}, {}, "version3"); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version3"); + } + // Second watch loses interest. + { + AddedRemoved added_removed = watch_map.updateWatchInterest(watch2, {"dummy"}); + EXPECT_TRUE(added_removed.added_.empty()); + EXPECT_EQ(std::set({"alice"}), added_removed.removed_); // remove from subscription + } +} + +// Checks the following: +// First watch on a resource name ==> updateWatchInterest() returns "add it to subscription" +// Watch loses interest ==> "remove it from subscription" +// Second watch on that name ==> "add it to subscription" +// NOTE: we need the resource name "dummy" to keep either watch from ever having no names watched, +// which is treated as interest in all names. +TEST(WatchMapTest, AddRemoveAdd) { + NamedMockSubscriptionCallbacks callbacks1; + NamedMockSubscriptionCallbacks callbacks2; + WatchMap watch_map; + Watch* watch1 = watch_map.addWatch(callbacks1); + Watch* watch2 = watch_map.addWatch(callbacks2); + + Protobuf::RepeatedPtrField updated_resources; + envoy::api::v2::ClusterLoadAssignment alice; + alice.set_cluster_name("alice"); + updated_resources.Add()->PackFrom(alice); + + // First watch becomes interested. + { + std::set update_to({"alice", "dummy"}); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch1, update_to); + EXPECT_EQ(update_to, added_removed.added_); // add to subscription + EXPECT_TRUE(added_removed.removed_.empty()); + watch_map.updateWatchInterest(watch2, {"dummy"}); + + // First watch receives update. + expectDeltaAndSotwUpdate(callbacks1, {alice}, {}, "version1"); + expectNoDeltaUpdate(callbacks2, "version1"); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version1"); + } + // First watch loses interest. + { + AddedRemoved added_removed = watch_map.updateWatchInterest(watch1, {"dummy"}); + EXPECT_TRUE(added_removed.added_.empty()); + EXPECT_EQ(std::set({"alice"}), added_removed.removed_); // remove from subscription + + // (The xDS client should have responded to updateWatchInterest()'s return value by removing + // Alice from the subscription, so onConfigUpdate() calls should be impossible right now.) + } + // Second watch becomes interested. + { + std::set update_to({"alice", "dummy"}); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch2, update_to); + EXPECT_EQ(std::set({"alice"}), added_removed.added_); // add to subscription + EXPECT_TRUE(added_removed.removed_.empty()); + + // *Only* second watch receives update. + expectNoDeltaUpdate(callbacks1, "version2"); + expectDeltaAndSotwUpdate(callbacks2, {alice}, {}, "version2"); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version2"); + } +} + +// Tests that nothing breaks if an update arrives that we entirely do not care about. +TEST(WatchMapTest, UninterestingUpdate) { + NamedMockSubscriptionCallbacks callbacks; + WatchMap watch_map; + Watch* watch = watch_map.addWatch(callbacks); + watch_map.updateWatchInterest(watch, {"alice"}); + + Protobuf::RepeatedPtrField alice_update; + envoy::api::v2::ClusterLoadAssignment alice; + alice.set_cluster_name("alice"); + alice_update.Add()->PackFrom(alice); + + Protobuf::RepeatedPtrField bob_update; + envoy::api::v2::ClusterLoadAssignment bob; + bob.set_cluster_name("bob"); + bob_update.Add()->PackFrom(bob); + + expectNoDeltaUpdate(callbacks, "version1"); + doDeltaAndSotwUpdate(watch_map, bob_update, {}, "version1"); + + expectDeltaAndSotwUpdate(callbacks, {alice}, {}, "version2"); + doDeltaAndSotwUpdate(watch_map, alice_update, {}, "version2"); + + expectNoDeltaUpdate(callbacks, "version3"); + doDeltaAndSotwUpdate(watch_map, bob_update, {}, "version3"); + + // Clean removal of the watch: first update to "interested in nothing", then remove. + watch_map.updateWatchInterest(watch, {}); + watch_map.removeWatch(watch); + + // Finally, test that calling onConfigUpdate on a map with no watches doesn't break. + doDeltaAndSotwUpdate(watch_map, bob_update, {}, "version4"); +} + +// Tests that a watch that specifies no particular resource interest is treated as interested in +// everything. +TEST(WatchMapTest, WatchingEverything) { + NamedMockSubscriptionCallbacks callbacks1; + NamedMockSubscriptionCallbacks callbacks2; + WatchMap watch_map; + /*Watch* watch1 = */ watch_map.addWatch(callbacks1); + Watch* watch2 = watch_map.addWatch(callbacks2); + // watch1 never specifies any names, and so is treated as interested in everything. + watch_map.updateWatchInterest(watch2, {"alice"}); + + Protobuf::RepeatedPtrField updated_resources; + envoy::api::v2::ClusterLoadAssignment alice; + alice.set_cluster_name("alice"); + updated_resources.Add()->PackFrom(alice); + envoy::api::v2::ClusterLoadAssignment bob; + bob.set_cluster_name("bob"); + updated_resources.Add()->PackFrom(bob); + + std::vector expected_resources1; + expected_resources1.push_back(alice); + expected_resources1.push_back(bob); + std::vector expected_resources2; + expected_resources2.push_back(alice); + + expectDeltaAndSotwUpdate(callbacks1, expected_resources1, {}, "version1"); + expectDeltaAndSotwUpdate(callbacks2, expected_resources2, {}, "version1"); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version1"); +} + +// Delta onConfigUpdate has some slightly subtle details with how it handles the three cases where a +// watch receives {only updates, updates+removals, only removals} to its resources. This test +// exercise those cases. Also, the removal-only case tests that SotW does call a watch's +// onConfigUpdate even if none of the watch's interested resources are among the updated resources. +// (Which ensures we deliver empty config updates when a resource is dropped.) +TEST(WatchMapTest, DeltaOnConfigUpdate) { + NamedMockSubscriptionCallbacks callbacks1; + NamedMockSubscriptionCallbacks callbacks2; + NamedMockSubscriptionCallbacks callbacks3; + WatchMap watch_map; + Watch* watch1 = watch_map.addWatch(callbacks1); + Watch* watch2 = watch_map.addWatch(callbacks2); + Watch* watch3 = watch_map.addWatch(callbacks3); + watch_map.updateWatchInterest(watch1, {"updated"}); + watch_map.updateWatchInterest(watch2, {"updated", "removed"}); + watch_map.updateWatchInterest(watch3, {"removed"}); + + Protobuf::RepeatedPtrField update; + envoy::api::v2::ClusterLoadAssignment updated; + updated.set_cluster_name("updated"); + update.Add()->PackFrom(updated); + + expectDeltaAndSotwUpdate(callbacks1, {updated}, {}, "version1"); // only update + expectDeltaAndSotwUpdate(callbacks2, {updated}, {"removed"}, "version1"); // update+remove + expectDeltaAndSotwUpdate(callbacks3, {}, {"removed"}, "version1"); // only remove + doDeltaAndSotwUpdate(watch_map, update, {"removed"}, "version1"); +} + +TEST(WatchMapTest, OnConfigUpdateFailed) { + WatchMap watch_map; + // calling on empty map doesn't break + watch_map.onConfigUpdateFailed(ConfigUpdateFailureReason::UpdateRejected, nullptr); + + NamedMockSubscriptionCallbacks callbacks1; + NamedMockSubscriptionCallbacks callbacks2; + watch_map.addWatch(callbacks1); + watch_map.addWatch(callbacks2); + + EXPECT_CALL(callbacks1, onConfigUpdateFailed(ConfigUpdateFailureReason::UpdateRejected, nullptr)); + EXPECT_CALL(callbacks2, onConfigUpdateFailed(ConfigUpdateFailureReason::UpdateRejected, nullptr)); + watch_map.onConfigUpdateFailed(ConfigUpdateFailureReason::UpdateRejected, nullptr); +} + +} // namespace +} // namespace Config +} // namespace Envoy diff --git a/tools/spelling_dictionary.txt b/tools/spelling_dictionary.txt index 50abfff89e48..3f935e236cef 100644 --- a/tools/spelling_dictionary.txt +++ b/tools/spelling_dictionary.txt @@ -85,6 +85,7 @@ EVAL EVLOOP EVP EWOULDBLOCK +EXPECTs EXPR FAQ FDs @@ -468,6 +469,7 @@ evthread evwatch exe execlp +expectDeltaAndSotwUpdate facto favicon fd @@ -624,6 +626,7 @@ params paren parentid parsers +passthroughs pcall pcap pclose