-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
config: WatchMap: cleaner management of watches #7108
Merged
Merged
Changes from 33 commits
Commits
Show all changes
40 commits
Select commit
Hold shift + click to select a range
2ff7a0d
initial WatchMap snapshot
fredlas 502e3b7
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas 3816de1
cleanup and comments
fredlas 2d7a247
compiles and a basic test passes
fredlas d823320
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas d30e229
add two more tests, and fix a bug, thanks unit testing
fredlas 2ce83f4
test delta too
fredlas df14958
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas 33aff0f
tiny rearrangement
fredlas 67dcc67
spellcheck
fredlas 2ac1518
unmock resourceName to enable switching off of NiceMock
fredlas fdb0a18
add a test, fix another bug, thanks again unit testing
fredlas 21f428a
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas 38f13cc
support watches that want to watch everything by providing no names
fredlas b8b7b08
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas eb0f380
fix compile after merge
fredlas 00f7f55
snapshot
fredlas 943949a
change std pair to AddedRemoved
fredlas c61a4aa
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas 5582614
remove virtual
fredlas 3ac614e
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas a8d53c2
add PNG diagram of intended usage of WatchMap
fredlas e571c53
add interface, move WatchMap to WatchMapImpl
fredlas db4fd7a
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas 3ce07f4
privatize SubscriptionCallbacks
fredlas 34dd5ce
move AddedRemoved, move png
fredlas 885bc2c
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas ea4a01f
private friend Watch
fredlas 37ffff6
clang tidy
fredlas 4900eb9
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas c238f1f
change Watch to class and interface
fredlas 46e3fa9
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas 8010950
add another word to the dictionary
fredlas 8df252b
back to earlier design
fredlas 76bd613
merge conflict, realphabetize BUILD
fredlas 4dba42d
clang tidy
fredlas 692e982
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas 8a4ae35
merge conflict
fredlas 064fe75
merge conflict
fredlas 3a1d2ba
snapshot
fredlas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
#pragma once | ||
|
||
#include <set> | ||
#include <string> | ||
|
||
#include "envoy/common/pure.h" | ||
#include "envoy/config/subscription.h" | ||
|
||
namespace Envoy { | ||
namespace Config { | ||
|
||
// Watch and WatchMap together manage "watches" of xDS resources. Several callers might ask | ||
// for subscriptions to the same xDS resource "X". The xDS machinery must give each their | ||
// very own Subscription object that receives updates on X, but we can't be sending multiple | ||
// redundant requests to the server. Watch+WatchMap avoid that: each of those Subscriptions | ||
// just holds a Watch on X; behind the scenes, GrpcMux (instructed by WatchMap) manages the | ||
// actual xDS protocol requests for X. | ||
// | ||
// All of this is implicitly within the context of a given type_url (EDS, CDS, etc), and unaware | ||
// of the watches for the other type_urls. | ||
|
||
// pair<set<string>, set<string>>, but with meaningful field names. | ||
struct AddedRemoved { | ||
AddedRemoved(std::set<std::string>&& added, std::set<std::string>&& removed) | ||
: added_(std::move(added)), removed_(std::move(removed)) {} | ||
std::set<std::string> added_; | ||
std::set<std::string> removed_; | ||
}; | ||
|
||
// A Watch object tracks the xDS resource names that some object in the wider Envoy codebase is | ||
// interested in. The union of these names becomes the xDS subscription interest. | ||
class Watch : public SubscriptionCallbacks { | ||
public: | ||
virtual ~Watch() = default; | ||
|
||
// Informs the parent WatchMap of an update to this Watch's set of watched resource names. | ||
// The resource names in the returned AddedRemoved should be added to/removed from the actual | ||
// conversation with the xDS server. | ||
virtual AddedRemoved updateWatchInterest(const std::set<std::string>& update_to_these_names) PURE; | ||
}; | ||
using WatchPtr = std::unique_ptr<Watch>; | ||
|
||
// WatchMap tracks all of the Watches for a given type_url. When an individual Watch's interest | ||
// changes, its parent WatchMap records the change, and determines what (if any) change to the | ||
// overall xDS subscription interest is needed, based on all other Watches' interests. | ||
class WatchMap { | ||
public: | ||
virtual ~WatchMap() = default; | ||
|
||
// Adds 'callbacks' to the WatchMap as a wildcard watch. You can later call | ||
// Watch::updateWatchInterest() to replace the wildcard matching with specific names. | ||
// Returns ownership of the newly added watch. Destroy to remove from map. | ||
virtual WatchPtr addWatch(SubscriptionCallbacks& callbacks) PURE; | ||
|
||
// Intended to be called only by the Watch's destructor. | ||
// Expects that the watch to be removed has already had all of its resource names removed via | ||
// updateWatchInterest(). | ||
virtual void removeWatch(Watch* watch) PURE; | ||
|
||
// While set to true (which is the default state of a newly added Watch), 'watch' will receive | ||
// all resource updates in each new config update message. | ||
virtual void setWildcardness(Watch* watch, bool is_wildcard) PURE; | ||
|
||
// Given a list of names that are new to an individual watch, returns those names that are in fact | ||
// new to the entire subscription. | ||
virtual std::set<std::string> findAdditions(const std::vector<std::string>& newly_added_to_watch, | ||
Watch* watch) PURE; | ||
|
||
// Given a list of names that an individual watch no longer cares about, returns those names that | ||
// in fact the entire subscription no longer cares about. | ||
virtual std::set<std::string> | ||
findRemovals(const std::vector<std::string>& newly_removed_from_watch, Watch* watch) PURE; | ||
fredlas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
|
||
} // namespace Config | ||
} // namespace Envoy |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
#include "common/config/watch_impl.h" | ||
|
||
namespace Envoy { | ||
namespace Config { | ||
|
||
// The return value logic: | ||
// 1) if update_to_these_names contains X, and no other Watch in the parent WatchMap | ||
// cares about X, then X will be in added_. | ||
// 2) if update_to_these_names does not contain Y, and this Watch was the only one in the | ||
// WatchMap that cared about Y, then Y will be in removed_. | ||
AddedRemoved WatchImpl::updateWatchInterest(const std::set<std::string>& update_to_these_names) { | ||
parent_map_.setWildcardness(this, update_to_these_names.empty()); | ||
|
||
std::vector<std::string> newly_added_to_watch; | ||
std::set_difference(update_to_these_names.begin(), update_to_these_names.end(), | ||
resource_names_.begin(), resource_names_.end(), | ||
std::inserter(newly_added_to_watch, newly_added_to_watch.begin())); | ||
|
||
std::vector<std::string> newly_removed_from_watch; | ||
std::set_difference(resource_names_.begin(), resource_names_.end(), update_to_these_names.begin(), | ||
update_to_these_names.end(), | ||
std::inserter(newly_removed_from_watch, newly_removed_from_watch.begin())); | ||
|
||
resource_names_ = update_to_these_names; | ||
|
||
return AddedRemoved(parent_map_.findAdditions(newly_added_to_watch, this), | ||
parent_map_.findRemovals(newly_removed_from_watch, this)); | ||
} | ||
|
||
void WatchImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources, | ||
const std::string& version_info) { | ||
callbacks_.onConfigUpdate(resources, version_info); | ||
} | ||
|
||
void WatchImpl::onConfigUpdate( | ||
const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources, | ||
const Protobuf::RepeatedPtrField<std::string>& removed_resources, | ||
const std::string& system_version_info) { | ||
callbacks_.onConfigUpdate(added_resources, removed_resources, system_version_info); | ||
} | ||
|
||
void WatchImpl::onConfigUpdateFailed(const EnvoyException* e) { | ||
callbacks_.onConfigUpdateFailed(e); | ||
} | ||
|
||
std::string WatchImpl::resourceName(const ProtobufWkt::Any& resource) { | ||
return callbacks_.resourceName(resource); | ||
} | ||
|
||
} // namespace Config | ||
} // namespace Envoy |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
#pragma once | ||
|
||
#include <set> | ||
#include <string> | ||
|
||
#include "envoy/config/watch_map.h" | ||
|
||
namespace Envoy { | ||
namespace Config { | ||
|
||
class WatchImpl : public Watch { | ||
public: | ||
WatchImpl(WatchMap& parent_map, SubscriptionCallbacks& callbacks) | ||
: parent_map_(parent_map), callbacks_(callbacks) {} | ||
~WatchImpl() override { parent_map_.removeWatch(this); } | ||
|
||
AddedRemoved updateWatchInterest(const std::set<std::string>& update_to_these_names) override; | ||
|
||
private: | ||
// SubscriptionCallbacks (all passthroughs to callbacks_) | ||
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources, | ||
const std::string& version_info) override; | ||
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources, | ||
const Protobuf::RepeatedPtrField<std::string>& removed_resources, | ||
const std::string& system_version_info) override; | ||
void onConfigUpdateFailed(const EnvoyException* e) override; | ||
std::string resourceName(const ProtobufWkt::Any&) override; | ||
|
||
WatchMap& parent_map_; | ||
SubscriptionCallbacks& callbacks_; | ||
std::set<std::string> resource_names_; // must be sorted set, for set_difference. | ||
}; | ||
|
||
} // namespace Config | ||
} // namespace Envoy |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
#include "common/config/watch_map_impl.h" | ||
|
||
#include "common/config/watch_impl.h" | ||
|
||
namespace Envoy { | ||
namespace Config { | ||
|
||
WatchPtr WatchMapImpl::addWatch(SubscriptionCallbacks& callbacks) { | ||
auto watch = std::make_unique<WatchImpl>(*this, callbacks); | ||
wildcard_watches_.insert(watch.get()); | ||
watches_.insert(watch.get()); | ||
return watch; | ||
} | ||
|
||
void WatchMapImpl::removeWatch(Watch* watch) { | ||
wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone. | ||
watches_.erase(watch); | ||
} | ||
|
||
void WatchMapImpl::setWildcardness(Watch* watch, bool is_wildcard) { | ||
if (is_wildcard) { | ||
wildcard_watches_.insert(watch); | ||
} else { | ||
wildcard_watches_.erase(watch); | ||
} | ||
} | ||
|
||
absl::flat_hash_set<Watch*> WatchMapImpl::watchesInterestedIn(const std::string& resource_name) { | ||
// Note that std::set_union needs sorted sets. Better to do it ourselves with insert(). | ||
absl::flat_hash_set<Watch*> ret = wildcard_watches_; | ||
auto watches_interested = watch_interest_.find(resource_name); | ||
if (watches_interested != watch_interest_.end()) { | ||
for (const auto& watch : watches_interested->second) { | ||
ret.insert(watch); | ||
} | ||
} | ||
return ret; | ||
} | ||
|
||
void WatchMapImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources, | ||
const std::string& version_info) { | ||
if (watches_.empty()) { | ||
ENVOY_LOG(warn, "WatchMapImpl::onConfigUpdate: there are no watches!"); | ||
return; | ||
} | ||
|
||
// Build a map from watches, to the set of updated resources that each watch cares about. Each | ||
// entry in the map is then a nice little bundle that can be fed directly into the individual | ||
// onConfigUpdate()s. | ||
absl::flat_hash_map<Watch*, Protobuf::RepeatedPtrField<ProtobufWkt::Any>> per_watch_updates; | ||
for (const auto& r : resources) { | ||
const absl::flat_hash_set<Watch*>& interested_in_r = | ||
watchesInterestedIn((*watches_.begin())->resourceName(r)); | ||
for (const auto& interested_watch : interested_in_r) { | ||
per_watch_updates[interested_watch].Add()->CopyFrom(r); | ||
} | ||
} | ||
|
||
// We just bundled up the updates into nice per-watch packages. Now, deliver them. | ||
for (auto& watch : watches_) { | ||
auto this_watch_updates = per_watch_updates.find(watch); | ||
if (this_watch_updates == per_watch_updates.end()) { | ||
// This update included no resources this watch cares about - so we do an empty | ||
// onConfigUpdate(), to notify the watch that its resources - if they existed before this - | ||
// were dropped. | ||
watch->onConfigUpdate({}, version_info); | ||
} else { | ||
watch->onConfigUpdate(this_watch_updates->second, version_info); | ||
} | ||
} | ||
} | ||
|
||
void WatchMapImpl::onConfigUpdate( | ||
const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources, | ||
const Protobuf::RepeatedPtrField<std::string>& removed_resources, | ||
const std::string& system_version_info) { | ||
// Build a pair of maps: from watches, to the set of resources {added,removed} that each watch | ||
// cares about. Each entry in the map-pair is then a nice little bundle that can be fed directly | ||
// into the individual onConfigUpdate()s. | ||
absl::flat_hash_map<Watch*, Protobuf::RepeatedPtrField<envoy::api::v2::Resource>> per_watch_added; | ||
for (const auto& r : added_resources) { | ||
const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r.name()); | ||
for (const auto& interested_watch : interested_in_r) { | ||
per_watch_added[interested_watch].Add()->CopyFrom(r); | ||
} | ||
} | ||
absl::flat_hash_map<Watch*, Protobuf::RepeatedPtrField<std::string>> per_watch_removed; | ||
for (const auto& r : removed_resources) { | ||
const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r); | ||
for (const auto& interested_watch : interested_in_r) { | ||
*per_watch_removed[interested_watch].Add() = r; | ||
} | ||
} | ||
|
||
// We just bundled up the updates into nice per-watch packages. Now, deliver them. | ||
for (const auto& added : per_watch_added) { | ||
Watch* cur_watch = added.first; | ||
auto removed = per_watch_removed.find(cur_watch); | ||
if (removed == per_watch_removed.end()) { | ||
// additions only, no removals | ||
cur_watch->onConfigUpdate(added.second, {}, system_version_info); | ||
} else { | ||
// both additions and removals | ||
cur_watch->onConfigUpdate(added.second, removed->second, system_version_info); | ||
// Drop the removals now, so the final removals-only pass won't use them. | ||
per_watch_removed.erase(removed); | ||
} | ||
} | ||
// Any removals-only updates will not have been picked up in the per_watch_added loop. | ||
for (auto& removed : per_watch_removed) { | ||
removed.first->onConfigUpdate({}, removed.second, system_version_info); | ||
} | ||
} | ||
|
||
void WatchMapImpl::onConfigUpdateFailed(const EnvoyException* e) { | ||
for (auto& watch : watches_) { | ||
watch->onConfigUpdateFailed(e); | ||
} | ||
} | ||
|
||
std::set<std::string> | ||
WatchMapImpl::findAdditions(const std::vector<std::string>& newly_added_to_watch, Watch* watch) { | ||
std::set<std::string> newly_added_to_subscription; | ||
for (const auto& name : newly_added_to_watch) { | ||
auto entry = watch_interest_.find(name); | ||
if (entry == watch_interest_.end()) { | ||
newly_added_to_subscription.insert(name); | ||
watch_interest_[name] = {watch}; | ||
} else { | ||
entry->second.insert(watch); | ||
} | ||
} | ||
return newly_added_to_subscription; | ||
} | ||
|
||
std::set<std::string> | ||
WatchMapImpl::findRemovals(const std::vector<std::string>& newly_removed_from_watch, Watch* watch) { | ||
std::set<std::string> newly_removed_from_subscription; | ||
for (const auto& name : newly_removed_from_watch) { | ||
auto entry = watch_interest_.find(name); | ||
if (entry == watch_interest_.end()) { | ||
ENVOY_LOG(warn, "WatchMapImpl: tried to remove a watch from untracked resource {}", name); | ||
continue; | ||
} | ||
|
||
entry->second.erase(watch); | ||
if (entry->second.empty()) { | ||
watch_interest_.erase(entry); | ||
newly_removed_from_subscription.insert(name); | ||
} | ||
} | ||
return newly_removed_from_subscription; | ||
} | ||
|
||
} // namespace Config | ||
} // namespace Envoy |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would still prefer a RAII pattern here. This is a pretty clear example of resource reclamation, and fits the C++ RAII paradigm closely. This gets back to the earlier point about the parent pointer; we do this in a bunch of places in Envoy, the idea is that resources know how to clean themselves up and contain the necessary logic to do so via scope removal. This is a safer pattern (in particular in the presence of exception) and avoids the client code needing to futz with handles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I feel we're circling around quite a bit on this one. Maybe the best thing to do at this point is bring in another reviewer who can break deadlock, since I'm probably just being stubborn. @envoyproxy/maintainers is anyone interested in providing some additional perspective on this PR (in the space of refactoring around incremental xDS)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what you're asking for. removeWatch() is only called by the destructor of Watch. Its purpose is to clean up WatchMap internal state in response to the Watch being removed. This all seems fully RAIIified already. I guess removeWatch() could become private, and Watch could become a friend?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I think something like this makes sense. I.e. we shouldn't have to expose out on a public interface any removal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, SGTM, done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You say done here, but it's still on the virtual interface :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, looks like I missed that when I had Watch as a friend. However, now that Watch is more separated out, and "communicating" with WatchMap rather than WatchMap just using Watch structs as bits of state, isn't it necessary to be in the interface?