Skip to content

Commit

Permalink
Refactor listener related interface and data structure (#4927)
Browse files Browse the repository at this point in the history
* refactor listener data structure

* remove unrelated topo listener
  • Loading branch information
critical27 authored Nov 24, 2022
1 parent 84bc89a commit 819b84d
Show file tree
Hide file tree
Showing 14 changed files with 338 additions and 290 deletions.
124 changes: 67 additions & 57 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1063,86 +1063,96 @@ void MetaClient::listenerDiff(const LocalCache& oldCache, const LocalCache& newC
return;
}

VLOG(1) << "Let's check if any listeners parts added for " << options_.localHost_;
for (auto& spaceEntry : newMap) {
auto spaceId = spaceEntry.first;
VLOG(1) << "Let's check if any listeners is updated for " << options_.localHost_;
for (auto& [spaceId, typeMap] : newMap) {
auto oldSpaceIter = oldMap.find(spaceId);
if (oldSpaceIter == oldMap.end()) {
// new space is added
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added!";
listener_->onSpaceAdded(spaceId, true);
for (const auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
// create all type of listener when new space listener added
for (const auto& [type, listenerParts] : typeMap) {
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added, type is "
<< apache::thrift::util::enumNameSafe(type);
listener_->onListenerSpaceAdded(spaceId, type);
for (const auto& newListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
}
} else {
// check if new part listener is added
for (auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
auto oldPartIter = oldSpaceIter->second.find(partId);
if (oldPartIter == oldSpaceIter->second.end()) {
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
for (auto& [type, listenerParts] : typeMap) {
auto oldTypeIter = oldSpaceIter->second.find(type);
// create missing type of listener when new type of listener added
if (oldTypeIter == oldSpaceIter->second.end()) {
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added, type is "
<< apache::thrift::util::enumNameSafe(type);
listener_->onListenerSpaceAdded(spaceId, type);
for (const auto& newListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
} else {
std::sort(partEntry.second.begin(), partEntry.second.end());
std::sort(oldPartIter->second.begin(), oldPartIter->second.end());
// create missing part of listener of specified type
std::sort(listenerParts.begin(), listenerParts.end());
std::sort(oldTypeIter->second.begin(), oldTypeIter->second.end());
std::vector<ListenerHosts> diff;
std::set_difference(partEntry.second.begin(),
partEntry.second.end(),
oldPartIter->second.begin(),
oldPartIter->second.end(),
std::set_difference(listenerParts.begin(),
listenerParts.end(),
oldTypeIter->second.begin(),
oldTypeIter->second.end(),
std::back_inserter(diff));
for (const auto& info : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
for (const auto& newListener : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
}
}
}
}

VLOG(1) << "Let's check if any old listeners removed....";
for (auto& spaceEntry : oldMap) {
auto spaceId = spaceEntry.first;
VLOG(1) << "Let's check if any listeners is removed from " << options_.localHost_;
for (auto& [spaceId, typeMap] : oldMap) {
auto newSpaceIter = newMap.find(spaceId);
if (newSpaceIter == newMap.end()) {
// remove old space
for (const auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
for (const auto& info : partEntry.second) {
VLOG(1) << "SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
// remove all type of listener when space listener removed
for (const auto& [type, listenerParts] : typeMap) {
for (const auto& outdateListener : listenerParts) {
VLOG(1) << "SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
listener_->onListenerSpaceRemoved(spaceId, type);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed, type is "
<< apache::thrift::util::enumNameSafe(type);
}
listener_->onSpaceRemoved(spaceId, true);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed!";
} else {
// check if part listener is removed
for (auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
auto newPartIter = newSpaceIter->second.find(partId);
if (newPartIter == newSpaceIter->second.end()) {
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
for (auto& [type, listenerParts] : typeMap) {
auto newTypeIter = newSpaceIter->second.find(type);
// remove specified type of listener
if (newTypeIter == newSpaceIter->second.end()) {
for (const auto& outdateListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed!";
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
listener_->onListenerSpaceRemoved(spaceId, type);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed, type is "
<< apache::thrift::util::enumNameSafe(type);
} else {
std::sort(partEntry.second.begin(), partEntry.second.end());
std::sort(newPartIter->second.begin(), newPartIter->second.end());
// remove outdate part of listener of specified type
std::sort(listenerParts.begin(), listenerParts.end());
std::sort(newTypeIter->second.begin(), newTypeIter->second.end());
std::vector<ListenerHosts> diff;
std::set_difference(partEntry.second.begin(),
partEntry.second.end(),
newPartIter->second.begin(),
newPartIter->second.end(),
std::set_difference(listenerParts.begin(),
listenerParts.end(),
newTypeIter->second.begin(),
newTypeIter->second.end(),
std::back_inserter(diff));
for (const auto& info : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
for (const auto& outdateListener : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed!";
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
}
}
Expand Down Expand Up @@ -2929,7 +2939,7 @@ ListenersMap MetaClient::doGetListenersMap(const HostAddr& host, const LocalCach
auto partIter = space.second->partsAlloc_.find(partId);
if (partIter != space.second->partsAlloc_.end()) {
auto peers = partIter->second;
listenersMap[spaceId][partId].emplace_back(std::move(type), std::move(peers));
listenersMap[spaceId][type].emplace_back(partId, std::move(peers));
} else {
FLOG_WARN("%s has listener of [%d, %d], but can't find part peers",
host.toString().c_str(),
Expand Down
19 changes: 11 additions & 8 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ class MetaChangedListener {
public:
virtual ~MetaChangedListener() = default;

virtual void onSpaceAdded(GraphSpaceID spaceId, bool isListener = false) = 0;
virtual void onSpaceRemoved(GraphSpaceID spaceId, bool isListener = false) = 0;
virtual void onSpaceAdded(GraphSpaceID spaceId) = 0;
virtual void onSpaceRemoved(GraphSpaceID spaceId) = 0;
virtual void onSpaceOptionUpdated(
GraphSpaceID spaceId, const std::unordered_map<std::string, std::string>& options) = 0;
virtual void onPartAdded(const PartHosts& partHosts) = 0;
Expand All @@ -169,12 +169,15 @@ class MetaChangedListener {
virtual void fetchLeaderInfo(
std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>>& leaders) = 0;
virtual void fetchDiskParts(kvstore::SpaceDiskPartsMap& diskParts) = 0;
virtual void onListenerAdded(GraphSpaceID spaceId,
PartitionID partId,
const ListenerHosts& listenerHosts) = 0;
virtual void onListenerRemoved(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type) = 0;
virtual void onListenerSpaceAdded(GraphSpaceID spaceId, cpp2::ListenerType type) = 0;
virtual void onListenerSpaceRemoved(GraphSpaceID spaceId, cpp2::ListenerType type) = 0;
virtual void onListenerPartAdded(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type,
const std::vector<HostAddr>& peers) = 0;
virtual void onListenerPartRemoved(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type) = 0;
virtual void onCheckRemoteListeners(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<HostAddr>& remoteListeners) = 0;
Expand Down
15 changes: 8 additions & 7 deletions src/common/meta/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,30 @@ struct PartHosts {

// ListenerHosts saves the listener type and the peers of the data replica
struct ListenerHosts {
ListenerHosts(cpp2::ListenerType type, std::vector<HostAddr> peers)
: type_(std::move(type)), peers_(std::move(peers)) {}
ListenerHosts(PartitionID partId, std::vector<HostAddr> peers)
: partId_(partId), peers_(std::move(peers)) {}

bool operator==(const ListenerHosts& rhs) const {
return this->type_ == rhs.type_ && this->peers_ == rhs.peers_;
return this->partId_ == rhs.partId_ && this->peers_ == rhs.peers_;
}

bool operator<(const ListenerHosts& rhs) const {
if (this->type_ == rhs.type_) {
if (this->partId_ == rhs.partId_) {
return this->peers_ < rhs.peers_;
}
return this->type_ < rhs.type_;
return this->partId_ < rhs.partId_;
}

cpp2::ListenerType type_;
PartitionID partId_;
// peers is the part peers which would send logs to the listener
std::vector<HostAddr> peers_;
};

using PartsMap = std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, PartHosts>>;
// ListenersMap is used for listener replica to get its peers of data replica
using ListenersMap =
std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, std::vector<ListenerHosts>>>;
std::unordered_map<GraphSpaceID,
std::unordered_map<cpp2::ListenerType, std::vector<ListenerHosts>>>;
// RemoteListenerInfo is pair of <listener host, listener type>
using RemoteListenerInfo = std::pair<HostAddr, cpp2::ListenerType>;
// RemoteListeners is used for data replica to check if some part has remote
Expand Down
6 changes: 2 additions & 4 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ Listener::Listener(GraphSpaceID spaceId,
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan,
std::shared_ptr<RaftClient> clientMan,
std::shared_ptr<DiskManager> diskMan,
meta::SchemaManager* schemaMan)
std::shared_ptr<DiskManager> diskMan)
: RaftPart(FLAGS_cluster_id,
spaceId,
partId,
Expand All @@ -38,8 +37,7 @@ Listener::Listener(GraphSpaceID spaceId,
handlers,
snapshotMan,
clientMan,
diskMan),
schemaMan_(schemaMan) {}
diskMan) {}

void Listener::start(std::vector<HostAddr>&& peers, bool) {
std::lock_guard<std::mutex> g(raftLock_);
Expand Down
4 changes: 1 addition & 3 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ class Listener : public raftex::RaftPart {
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan,
std::shared_ptr<RaftClient> clientMan,
std::shared_ptr<DiskManager> diskMan,
meta::SchemaManager* schemaMan);
std::shared_ptr<DiskManager> diskMan);

/**
* @brief Initialize listener, all Listener must call this method
Expand Down Expand Up @@ -278,7 +277,6 @@ class Listener : public raftex::RaftPart {
LogID lastApplyLogId_ = 0;
int64_t lastApplyTime_ = 0;
std::set<HostAddr> peers_;
meta::SchemaManager* schemaMan_{nullptr};
};

} // namespace kvstore
Expand Down
39 changes: 0 additions & 39 deletions src/kvstore/ListenerFactory.h

This file was deleted.

Loading

0 comments on commit 819b84d

Please sign in to comment.