diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 87edb0e4156..8e095331603 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -1063,86 +1063,96 @@ void MetaClient::listenerDiff(const LocalCache& oldCache, const LocalCache& newC return; } - VLOG(1) << "Let's check if any listeners parts added for " << options_.localHost_; - for (auto& spaceEntry : newMap) { - auto spaceId = spaceEntry.first; + VLOG(1) << "Let's check if any listeners is updated for " << options_.localHost_; + for (auto& [spaceId, typeMap] : newMap) { auto oldSpaceIter = oldMap.find(spaceId); if (oldSpaceIter == oldMap.end()) { - // new space is added - VLOG(1) << "[Listener] SpaceId " << spaceId << " was added!"; - listener_->onSpaceAdded(spaceId, true); - for (const auto& partEntry : spaceEntry.second) { - auto partId = partEntry.first; - for (const auto& info : partEntry.second) { - VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!"; - listener_->onListenerAdded(spaceId, partId, info); + // create all type of listener when new space listener added + for (const auto& [type, listenerParts] : typeMap) { + VLOG(1) << "[Listener] SpaceId " << spaceId << " was added, type is " + << apache::thrift::util::enumNameSafe(type); + listener_->onListenerSpaceAdded(spaceId, type); + for (const auto& newListener : listenerParts) { + VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_ + << " was added, type is " << apache::thrift::util::enumNameSafe(type); + listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_); } } } else { - // check if new part listener is added - for (auto& partEntry : spaceEntry.second) { - auto partId = partEntry.first; - auto oldPartIter = oldSpaceIter->second.find(partId); - if (oldPartIter == oldSpaceIter->second.end()) { - for (const auto& info : partEntry.second) { - VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!"; - listener_->onListenerAdded(spaceId, partId, info); + for (auto& [type, listenerParts] : typeMap) { + auto oldTypeIter = oldSpaceIter->second.find(type); + // create missing type of listener when new type of listener added + if (oldTypeIter == oldSpaceIter->second.end()) { + VLOG(1) << "[Listener] SpaceId " << spaceId << " was added, type is " + << apache::thrift::util::enumNameSafe(type); + listener_->onListenerSpaceAdded(spaceId, type); + for (const auto& newListener : listenerParts) { + VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_ + << " was added, type is " << apache::thrift::util::enumNameSafe(type); + listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_); } } else { - std::sort(partEntry.second.begin(), partEntry.second.end()); - std::sort(oldPartIter->second.begin(), oldPartIter->second.end()); + // create missing part of listener of specified type + std::sort(listenerParts.begin(), listenerParts.end()); + std::sort(oldTypeIter->second.begin(), oldTypeIter->second.end()); std::vector diff; - std::set_difference(partEntry.second.begin(), - partEntry.second.end(), - oldPartIter->second.begin(), - oldPartIter->second.end(), + std::set_difference(listenerParts.begin(), + listenerParts.end(), + oldTypeIter->second.begin(), + oldTypeIter->second.end(), std::back_inserter(diff)); - for (const auto& info : diff) { - VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!"; - listener_->onListenerAdded(spaceId, partId, info); + for (const auto& newListener : diff) { + VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_ + << " was added, type is " << apache::thrift::util::enumNameSafe(type); + listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_); } } } } } - VLOG(1) << "Let's check if any old listeners removed...."; - for (auto& spaceEntry : oldMap) { - auto spaceId = spaceEntry.first; + VLOG(1) << "Let's check if any listeners is removed from " << options_.localHost_; + for (auto& [spaceId, typeMap] : oldMap) { auto newSpaceIter = newMap.find(spaceId); if (newSpaceIter == newMap.end()) { - // remove old space - for (const auto& partEntry : spaceEntry.second) { - auto partId = partEntry.first; - for (const auto& info : partEntry.second) { - VLOG(1) << "SpaceId " << spaceId << ", partId " << partId << " was removed!"; - listener_->onListenerRemoved(spaceId, partId, info.type_); + // remove all type of listener when space listener removed + for (const auto& [type, listenerParts] : typeMap) { + for (const auto& outdateListener : listenerParts) { + VLOG(1) << "SpaceId " << spaceId << ", partId " << outdateListener.partId_ + << " was removed, type is " << apache::thrift::util::enumNameSafe(type); + listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type); } + listener_->onListenerSpaceRemoved(spaceId, type); + VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed, type is " + << apache::thrift::util::enumNameSafe(type); } - listener_->onSpaceRemoved(spaceId, true); - VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed!"; } else { - // check if part listener is removed - for (auto& partEntry : spaceEntry.second) { - auto partId = partEntry.first; - auto newPartIter = newSpaceIter->second.find(partId); - if (newPartIter == newSpaceIter->second.end()) { - for (const auto& info : partEntry.second) { - VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was removed!"; - listener_->onListenerRemoved(spaceId, partId, info.type_); + for (auto& [type, listenerParts] : typeMap) { + auto newTypeIter = newSpaceIter->second.find(type); + // remove specified type of listener + if (newTypeIter == newSpaceIter->second.end()) { + for (const auto& outdateListener : listenerParts) { + VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << outdateListener.partId_ + << " was removed!"; + listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type); } + listener_->onListenerSpaceRemoved(spaceId, type); + VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed, type is " + << apache::thrift::util::enumNameSafe(type); } else { - std::sort(partEntry.second.begin(), partEntry.second.end()); - std::sort(newPartIter->second.begin(), newPartIter->second.end()); + // remove outdate part of listener of specified type + std::sort(listenerParts.begin(), listenerParts.end()); + std::sort(newTypeIter->second.begin(), newTypeIter->second.end()); std::vector diff; - std::set_difference(partEntry.second.begin(), - partEntry.second.end(), - newPartIter->second.begin(), - newPartIter->second.end(), + std::set_difference(listenerParts.begin(), + listenerParts.end(), + newTypeIter->second.begin(), + newTypeIter->second.end(), std::back_inserter(diff)); - for (const auto& info : diff) { - VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was removed!"; - listener_->onListenerRemoved(spaceId, partId, info.type_); + for (const auto& outdateListener : diff) { + VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << outdateListener.partId_ + << " was removed!"; + listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type); } } } @@ -2929,7 +2939,7 @@ ListenersMap MetaClient::doGetListenersMap(const HostAddr& host, const LocalCach auto partIter = space.second->partsAlloc_.find(partId); if (partIter != space.second->partsAlloc_.end()) { auto peers = partIter->second; - listenersMap[spaceId][partId].emplace_back(std::move(type), std::move(peers)); + listenersMap[spaceId][type].emplace_back(partId, std::move(peers)); } else { FLOG_WARN("%s has listener of [%d, %d], but can't find part peers", host.toString().c_str(), diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index ce0e7cfc381..2278d0db262 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -159,8 +159,8 @@ class MetaChangedListener { public: virtual ~MetaChangedListener() = default; - virtual void onSpaceAdded(GraphSpaceID spaceId, bool isListener = false) = 0; - virtual void onSpaceRemoved(GraphSpaceID spaceId, bool isListener = false) = 0; + virtual void onSpaceAdded(GraphSpaceID spaceId) = 0; + virtual void onSpaceRemoved(GraphSpaceID spaceId) = 0; virtual void onSpaceOptionUpdated( GraphSpaceID spaceId, const std::unordered_map& options) = 0; virtual void onPartAdded(const PartHosts& partHosts) = 0; @@ -169,12 +169,15 @@ class MetaChangedListener { virtual void fetchLeaderInfo( std::unordered_map>& leaders) = 0; virtual void fetchDiskParts(kvstore::SpaceDiskPartsMap& diskParts) = 0; - virtual void onListenerAdded(GraphSpaceID spaceId, - PartitionID partId, - const ListenerHosts& listenerHosts) = 0; - virtual void onListenerRemoved(GraphSpaceID spaceId, - PartitionID partId, - cpp2::ListenerType type) = 0; + virtual void onListenerSpaceAdded(GraphSpaceID spaceId, cpp2::ListenerType type) = 0; + virtual void onListenerSpaceRemoved(GraphSpaceID spaceId, cpp2::ListenerType type) = 0; + virtual void onListenerPartAdded(GraphSpaceID spaceId, + PartitionID partId, + cpp2::ListenerType type, + const std::vector& peers) = 0; + virtual void onListenerPartRemoved(GraphSpaceID spaceId, + PartitionID partId, + cpp2::ListenerType type) = 0; virtual void onCheckRemoteListeners(GraphSpaceID spaceId, PartitionID partId, const std::vector& remoteListeners) = 0; diff --git a/src/common/meta/Common.h b/src/common/meta/Common.h index 3841ada4a2a..42d8da0f038 100644 --- a/src/common/meta/Common.h +++ b/src/common/meta/Common.h @@ -31,21 +31,21 @@ struct PartHosts { // ListenerHosts saves the listener type and the peers of the data replica struct ListenerHosts { - ListenerHosts(cpp2::ListenerType type, std::vector peers) - : type_(std::move(type)), peers_(std::move(peers)) {} + ListenerHosts(PartitionID partId, std::vector peers) + : partId_(partId), peers_(std::move(peers)) {} bool operator==(const ListenerHosts& rhs) const { - return this->type_ == rhs.type_ && this->peers_ == rhs.peers_; + return this->partId_ == rhs.partId_ && this->peers_ == rhs.peers_; } bool operator<(const ListenerHosts& rhs) const { - if (this->type_ == rhs.type_) { + if (this->partId_ == rhs.partId_) { return this->peers_ < rhs.peers_; } - return this->type_ < rhs.type_; + return this->partId_ < rhs.partId_; } - cpp2::ListenerType type_; + PartitionID partId_; // peers is the part peers which would send logs to the listener std::vector peers_; }; @@ -53,7 +53,8 @@ struct ListenerHosts { using PartsMap = std::unordered_map>; // ListenersMap is used for listener replica to get its peers of data replica using ListenersMap = - std::unordered_map>>; + std::unordered_map>>; // RemoteListenerInfo is pair of using RemoteListenerInfo = std::pair; // RemoteListeners is used for data replica to check if some part has remote diff --git a/src/kvstore/Listener.cpp b/src/kvstore/Listener.cpp index bde1a192261..a989e128672 100644 --- a/src/kvstore/Listener.cpp +++ b/src/kvstore/Listener.cpp @@ -26,8 +26,7 @@ Listener::Listener(GraphSpaceID spaceId, std::shared_ptr handlers, std::shared_ptr snapshotMan, std::shared_ptr clientMan, - std::shared_ptr diskMan, - meta::SchemaManager* schemaMan) + std::shared_ptr diskMan) : RaftPart(FLAGS_cluster_id, spaceId, partId, @@ -38,8 +37,7 @@ Listener::Listener(GraphSpaceID spaceId, handlers, snapshotMan, clientMan, - diskMan), - schemaMan_(schemaMan) {} + diskMan) {} void Listener::start(std::vector&& peers, bool) { std::lock_guard g(raftLock_); diff --git a/src/kvstore/Listener.h b/src/kvstore/Listener.h index dfdb6899366..5aa4640c001 100644 --- a/src/kvstore/Listener.h +++ b/src/kvstore/Listener.h @@ -115,8 +115,7 @@ class Listener : public raftex::RaftPart { std::shared_ptr handlers, std::shared_ptr snapshotMan, std::shared_ptr clientMan, - std::shared_ptr diskMan, - meta::SchemaManager* schemaMan); + std::shared_ptr diskMan); /** * @brief Initialize listener, all Listener must call this method @@ -278,7 +277,6 @@ class Listener : public raftex::RaftPart { LogID lastApplyLogId_ = 0; int64_t lastApplyTime_ = 0; std::set peers_; - meta::SchemaManager* schemaMan_{nullptr}; }; } // namespace kvstore diff --git a/src/kvstore/ListenerFactory.h b/src/kvstore/ListenerFactory.h deleted file mode 100644 index 95af6dad1c2..00000000000 --- a/src/kvstore/ListenerFactory.h +++ /dev/null @@ -1,39 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef KVSTORE_LISTENER_FACTORY_H_ -#define KVSTORE_LISTENER_FACTORY_H_ - -#include "kvstore/Listener.h" -#include "kvstore/plugins/elasticsearch/ESListener.h" - -namespace nebula { -namespace kvstore { - -/** - * @brief Factory to build listener - */ -class ListenerFactory { - public: - template - /** - * @brief Create a Listener object - * - * @param type Type of listener - * @param args Other parameters - * @return std::shared_ptr - */ - static std::shared_ptr createListener(meta::cpp2::ListenerType type, Args&&... args) { - if (type == meta::cpp2::ListenerType::ELASTICSEARCH) { - return std::make_shared(std::forward(args)...); - } - LOG(FATAL) << "Should not reach here"; - return nullptr; - } -}; - -} // namespace kvstore -} // namespace nebula -#endif // KVSTORE_LISTENER_FACTORY_H_ diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index e20da833e2e..4fe9e8ff28e 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -17,6 +17,7 @@ #include "common/utils/NebulaKeyUtils.h" #include "kvstore/NebulaSnapshotManager.h" #include "kvstore/RocksEngine.h" +#include "kvstore/plugins/elasticsearch/ESListener.h" DEFINE_string(engine_type, "rocksdb", "rocksdb, memory..."); DEFINE_int32(custom_filter_interval_secs, @@ -291,10 +292,11 @@ void NebulaStore::loadLocalListenerFromPartManager() { auto listenersMap = options_.partMan_->listeners(storeSvcAddr_); for (const auto& spaceEntry : listenersMap) { auto spaceId = spaceEntry.first; - for (const auto& partEntry : spaceEntry.second) { - auto partId = partEntry.first; - for (const auto& listener : partEntry.second) { - addListener(spaceId, partId, std::move(listener.type_), std::move(listener.peers_)); + for (const auto& typeEntry : spaceEntry.second) { + auto type = typeEntry.first; + addListenerSpace(spaceId, type); + for (const auto& info : typeEntry.second) { + addListenerPart(spaceId, info.partId_, type, info.peers_); } } } @@ -361,43 +363,33 @@ ErrorOr NebulaStore::partLeader(GraphSpaceID return getStoreAddr(partIt->second->leader()); } -void NebulaStore::addSpace(GraphSpaceID spaceId, bool isListener) { +void NebulaStore::addSpace(GraphSpaceID spaceId) { folly::RWSpinLock::WriteHolder wh(&lock_); - if (!isListener) { - // Iterate over all engines to ensure that each dataPath has an engine - if (this->spaces_.find(spaceId) != this->spaces_.end()) { - LOG(INFO) << "Data space " << spaceId << " has existed!"; - for (auto& path : options_.dataPaths_) { - bool engineExist = false; - auto dataPath = folly::stringPrintf("%s/nebula/%d", path.c_str(), spaceId); - for (auto iter = spaces_[spaceId]->engines_.begin(); - iter != spaces_[spaceId]->engines_.end(); - iter++) { - auto dPath = (*iter)->getDataRoot(); - if (dataPath.compare(dPath) == 0) { - engineExist = true; - break; - } - } - if (!engineExist) { - spaces_[spaceId]->engines_.emplace_back(newEngine(spaceId, path, options_.walPath_)); + // Iterate over all engines to ensure that each dataPath has an engine + if (this->spaces_.find(spaceId) != this->spaces_.end()) { + LOG(INFO) << "Data space " << spaceId << " has existed!"; + for (auto& path : options_.dataPaths_) { + bool engineExist = false; + auto dataPath = folly::stringPrintf("%s/nebula/%d", path.c_str(), spaceId); + // Check if given data path contain a kv engine of specified spaceId + for (auto iter = spaces_[spaceId]->engines_.begin(); iter != spaces_[spaceId]->engines_.end(); + iter++) { + auto dPath = (*iter)->getDataRoot(); + if (dataPath.compare(dPath) == 0) { + engineExist = true; + break; } } - } else { - LOG(INFO) << "Create data space " << spaceId; - this->spaces_[spaceId] = std::make_unique(); - for (auto& path : options_.dataPaths_) { - this->spaces_[spaceId]->engines_.emplace_back(newEngine(spaceId, path, options_.walPath_)); + if (!engineExist) { + spaces_[spaceId]->engines_.emplace_back(newEngine(spaceId, path, options_.walPath_)); } } } else { - // listener don't need engine for now - if (this->spaceListeners_.find(spaceId) != this->spaceListeners_.end()) { - LOG(INFO) << "Listener space " << spaceId << " has existed!"; - return; + LOG(INFO) << "Create data space " << spaceId; + this->spaces_[spaceId] = std::make_unique(); + for (auto& path : options_.dataPaths_) { + this->spaces_[spaceId]->engines_.emplace_back(newEngine(spaceId, path, options_.walPath_)); } - LOG(INFO) << "Create listener space " << spaceId; - this->spaceListeners_[spaceId] = std::make_unique(); } } @@ -494,55 +486,44 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, return part; } -void NebulaStore::removeSpace(GraphSpaceID spaceId, bool isListener) { +void NebulaStore::removeSpace(GraphSpaceID spaceId) { folly::RWSpinLock::WriteHolder wh(&lock_); if (beforeRemoveSpace_) { beforeRemoveSpace_(spaceId); } - if (!isListener) { - auto spaceIt = this->spaces_.find(spaceId); - if (spaceIt != this->spaces_.end()) { - for (auto& [partId, part] : spaceIt->second->parts_) { - // before calling removeSpace, meta client would call removePart to remove all parts in - // meta cache, which do not contain learners, so we remove them here - if (part->isLearner()) { - removePart(spaceId, partId, false); - } - } - auto& engines = spaceIt->second->engines_; - for (auto& engine : engines) { - auto parts = engine->allParts(); - for (auto& partId : parts) { - engine->removePart(partId); - } - CHECK_EQ(0, engine->totalPartsNum()); + auto spaceIt = this->spaces_.find(spaceId); + if (spaceIt != this->spaces_.end()) { + for (auto& [partId, part] : spaceIt->second->parts_) { + // before calling removeSpace, meta client would call removePart to remove all parts in + // meta cache, which do not contain learners, so we remove them here + if (part->isLearner()) { + removePart(spaceId, partId, false); } - CHECK(spaceIt->second->parts_.empty()); - std::vector enginePaths; - if (FLAGS_auto_remove_invalid_space) { - for (auto& engine : engines) { - enginePaths.emplace_back(engine->getDataRoot()); - } + } + auto& engines = spaceIt->second->engines_; + for (auto& engine : engines) { + auto parts = engine->allParts(); + for (auto& partId : parts) { + engine->removePart(partId); } - this->spaces_.erase(spaceIt); - if (FLAGS_auto_remove_invalid_space) { - for (const auto& path : enginePaths) { - removeSpaceDir(path); - } + CHECK_EQ(0, engine->totalPartsNum()); + } + CHECK(spaceIt->second->parts_.empty()); + std::vector enginePaths; + if (FLAGS_auto_remove_invalid_space) { + for (auto& engine : engines) { + enginePaths.emplace_back(engine->getDataRoot()); } } - LOG(INFO) << "Data space " << spaceId << " has been removed!"; - } else { - auto spaceIt = this->spaceListeners_.find(spaceId); - if (spaceIt != this->spaceListeners_.end()) { - for (const auto& partEntry : spaceIt->second->listeners_) { - CHECK(partEntry.second.empty()); + this->spaces_.erase(spaceIt); + if (FLAGS_auto_remove_invalid_space) { + for (const auto& path : enginePaths) { + removeSpaceDir(path); } - this->spaceListeners_.erase(spaceIt); } - LOG(INFO) << "Listener space " << spaceId << " has been removed!"; } + LOG(INFO) << "Data space " << spaceId << " has been removed!"; } nebula::cpp2::ErrorCode NebulaStore::clearSpace(GraphSpaceID spaceId) { @@ -584,10 +565,35 @@ void NebulaStore::removePart(GraphSpaceID spaceId, PartitionID partId, bool need LOG(INFO) << "Space " << spaceId << ", part " << partId << " has been removed!"; } -void NebulaStore::addListener(GraphSpaceID spaceId, - PartitionID partId, - meta::cpp2::ListenerType type, - const std::vector& peers) { +void NebulaStore::addListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) { + UNUSED(type); + folly::RWSpinLock::WriteHolder wh(&lock_); + // listener don't need engine for now + if (this->spaceListeners_.find(spaceId) != this->spaceListeners_.end()) { + LOG(INFO) << "Listener space " << spaceId << " has existed!"; + return; + } + LOG(INFO) << "Create listener space " << spaceId; + this->spaceListeners_[spaceId] = std::make_unique(); +} + +void NebulaStore::removeListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) { + UNUSED(type); + folly::RWSpinLock::WriteHolder wh(&lock_); + auto spaceIt = this->spaceListeners_.find(spaceId); + if (spaceIt != this->spaceListeners_.end()) { + for (const auto& partEntry : spaceIt->second->listeners_) { + CHECK(partEntry.second.empty()); + } + this->spaceListeners_.erase(spaceIt); + } + LOG(INFO) << "Listener space " << spaceId << " has been removed!"; +} + +void NebulaStore::addListenerPart(GraphSpaceID spaceId, + PartitionID partId, + meta::cpp2::ListenerType type, + const std::vector& peers) { folly::RWSpinLock::WriteHolder wh(&lock_); auto spaceIt = spaceListeners_.find(spaceId); if (spaceIt == spaceListeners_.end()) { @@ -617,18 +623,14 @@ std::shared_ptr NebulaStore::newListener(GraphSpaceID spaceId, folly::stringPrintf("%s/%d/%d/wal", options_.listenerPath_.c_str(), spaceId, partId); // snapshot manager and client manager is set to nullptr, listener should // never use them - auto listener = ListenerFactory::createListener(type, - spaceId, - partId, - raftAddr_, - walPath, - ioPool_, - bgWorkers_, - workers_, - nullptr, - nullptr, - nullptr, - options_.schemaMan_); + std::shared_ptr listener; + if (type == meta::cpp2::ListenerType::ELASTICSEARCH) { + listener = std::make_shared( + spaceId, partId, raftAddr_, walPath, ioPool_, bgWorkers_, workers_, options_.schemaMan_); + } else { + LOG(FATAL) << "Should not reach here"; + return nullptr; + } raftService_->addPartition(listener); // add raft group as learner std::vector raftPeers; @@ -640,9 +642,9 @@ std::shared_ptr NebulaStore::newListener(GraphSpaceID spaceId, return listener; } -void NebulaStore::removeListener(GraphSpaceID spaceId, - PartitionID partId, - meta::cpp2::ListenerType type) { +void NebulaStore::removeListenerPart(GraphSpaceID spaceId, + PartitionID partId, + meta::cpp2::ListenerType type) { folly::RWSpinLock::WriteHolder wh(&lock_); auto spaceIt = spaceListeners_.find(spaceId); if (spaceIt != spaceListeners_.end()) { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 7a5bff80f1e..3439991ab8f 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -18,7 +18,6 @@ #include "kvstore/KVEngine.h" #include "kvstore/KVStore.h" #include "kvstore/Listener.h" -#include "kvstore/ListenerFactory.h" #include "kvstore/Part.h" #include "kvstore/PartManager.h" #include "kvstore/raftex/RaftexService.h" @@ -544,9 +543,8 @@ class NebulaStore : public KVStore, public Handler { * @brief Add a space, called from part manager * * @param spaceId - * @param isListener Whether the space is listener */ - void addSpace(GraphSpaceID spaceId, bool isListener = false) override; + void addSpace(GraphSpaceID spaceId) override; /** * @brief Add a partition, called from part manager @@ -567,7 +565,7 @@ class NebulaStore : public KVStore, public Handler { * @param spaceId * @param isListener Whether the space is listener */ - void removeSpace(GraphSpaceID spaceId, bool isListener) override; + void removeSpace(GraphSpaceID spaceId) override; /** * @brief clear space data, but not remove the data dirs. @@ -622,6 +620,22 @@ class NebulaStore : public KVStore, public Handler { nebula::cpp2::ErrorCode restoreFromFiles(GraphSpaceID spaceId, const std::vector& files) override; + /** + * @brief Add a space as listener + * + * @param spaceId + * @param type Listener type + */ + void addListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) override; + + /** + * @brief Remove a listener space + * + * @param spaceId + * @param type Listener type + */ + void removeListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) override; + /** * @brief Add a partition as listener * @@ -630,10 +644,10 @@ class NebulaStore : public KVStore, public Handler { * @param type Listener type * @param peers Raft peers of listener */ - void addListener(GraphSpaceID spaceId, - PartitionID partId, - meta::cpp2::ListenerType type, - const std::vector& peers) override; + void addListenerPart(GraphSpaceID spaceId, + PartitionID partId, + meta::cpp2::ListenerType type, + const std::vector& peers) override; /** * @brief Remove a listener partition @@ -642,9 +656,9 @@ class NebulaStore : public KVStore, public Handler { * @param partId * @param type Listener type */ - void removeListener(GraphSpaceID spaceId, - PartitionID partId, - meta::cpp2::ListenerType type) override; + void removeListenerPart(GraphSpaceID spaceId, + PartitionID partId, + meta::cpp2::ListenerType type) override; /** * @brief Check if the partition's listener state has changed, add/remove if necessary diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index 7508cd65d11..221fdb0538a 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -84,15 +84,19 @@ Status MetaServerBasedPartManager::spaceExist(const HostAddr& host, GraphSpaceID return client_->checkSpaceExistInCache(host, spaceId); } -void MetaServerBasedPartManager::onSpaceAdded(GraphSpaceID spaceId, bool isListener) { +void MetaServerBasedPartManager::onSpaceAdded(GraphSpaceID spaceId) { if (handler_ != nullptr) { - handler_->addSpace(spaceId, isListener); + handler_->addSpace(spaceId); + } else { + VLOG(1) << "handler_ is nullptr!"; } } -void MetaServerBasedPartManager::onSpaceRemoved(GraphSpaceID spaceId, bool isListener) { +void MetaServerBasedPartManager::onSpaceRemoved(GraphSpaceID spaceId) { if (handler_ != nullptr) { - handler_->removeSpace(spaceId, isListener); + handler_->removeSpace(spaceId); + } else { + VLOG(1) << "handler_ is nullptr!"; } } @@ -200,19 +204,42 @@ StatusOr> MetaServerBasedPartManager::list return client_->getListenerHostTypeBySpacePartType(spaceId, partId); } -void MetaServerBasedPartManager::onListenerAdded(GraphSpaceID spaceId, - PartitionID partId, - const meta::ListenerHosts& listenerHost) { +void MetaServerBasedPartManager::onListenerSpaceAdded(GraphSpaceID spaceId, + meta::cpp2::ListenerType type) { if (handler_ != nullptr) { - handler_->addListener(spaceId, partId, listenerHost.type_, listenerHost.peers_); + handler_->addListenerSpace(spaceId, type); + } else { + VLOG(1) << "handler_ is nullptr!"; } } -void MetaServerBasedPartManager::onListenerRemoved(GraphSpaceID spaceId, - PartitionID partId, - meta::cpp2::ListenerType type) { +void MetaServerBasedPartManager::onListenerSpaceRemoved(GraphSpaceID spaceId, + meta::cpp2::ListenerType type) { if (handler_ != nullptr) { - handler_->removeListener(spaceId, partId, type); + handler_->removeListenerSpace(spaceId, type); + } else { + VLOG(1) << "handler_ is nullptr!"; + } +} + +void MetaServerBasedPartManager::onListenerPartAdded(GraphSpaceID spaceId, + PartitionID partId, + meta::cpp2::ListenerType type, + const std::vector& peers) { + if (handler_ != nullptr) { + handler_->addListenerPart(spaceId, partId, type, peers); + } else { + VLOG(1) << "handler_ is nullptr!"; + } +} + +void MetaServerBasedPartManager::onListenerPartRemoved(GraphSpaceID spaceId, + PartitionID partId, + meta::cpp2::ListenerType type) { + if (handler_ != nullptr) { + handler_->removeListenerPart(spaceId, partId, type); + } else { + VLOG(1) << "handler_ is nullptr!"; } } diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index 34d3a57befa..65a1365eece 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -27,9 +27,8 @@ class Handler { * @brief Add a space * * @param spaceId - * @param isListener Whether the space is listener */ - virtual void addSpace(GraphSpaceID spaceId, bool isListener = false) = 0; + virtual void addSpace(GraphSpaceID spaceId) = 0; /** * @brief Add a partition @@ -59,9 +58,8 @@ class Handler { * @brief Remove a space * * @param spaceId - * @param isListener Whether the space is listener */ - virtual void removeSpace(GraphSpaceID spaceId, bool isListener = false) = 0; + virtual void removeSpace(GraphSpaceID spaceId) = 0; /** * @brief clear space data, but not remove the data dirs. @@ -80,28 +78,46 @@ class Handler { virtual void removePart(GraphSpaceID spaceId, PartitionID partId, bool needLock = true) = 0; /** - * @brief Add a partition as listener + * @brief Add a space to listener + * + * @param spaceId + * @param partId + * @param type Listener type + */ + virtual void addListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) = 0; + + /** + * @brief Remove a space from listener + * + * @param spaceId + * @param partId + * @param type Listener type + */ + virtual void removeListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) = 0; + + /** + * @brief Add a partition to listener * * @param spaceId * @param partId * @param type Listener type * @param peers Raft peers of listener */ - virtual void addListener(GraphSpaceID spaceId, - PartitionID partId, - meta::cpp2::ListenerType type, - const std::vector& peers) = 0; + virtual void addListenerPart(GraphSpaceID spaceId, + PartitionID partId, + meta::cpp2::ListenerType type, + const std::vector& peers) = 0; /** - * @brief Remove a listener partition + * @brief Remove a partition from listener * * @param spaceId * @param partId * @param type Listener type */ - virtual void removeListener(GraphSpaceID spaceId, - PartitionID partId, - meta::cpp2::ListenerType type) = 0; + virtual void removeListenerPart(GraphSpaceID spaceId, + PartitionID partId, + meta::cpp2::ListenerType type) = 0; /** * @brief Check if the partition's listener state has changed, add/remove if necessary @@ -426,9 +442,8 @@ class MetaServerBasedPartManager : public PartManager, public meta::MetaChangedL * @brief Found a new space, call handler's method * * @param spaceId - * @param isListener Whether the space is a listener */ - void onSpaceAdded(GraphSpaceID spaceId, bool isListener) override; + void onSpaceAdded(GraphSpaceID spaceId) override; /** * @brief Found a removed space, call handler's method @@ -436,7 +451,7 @@ class MetaServerBasedPartManager : public PartManager, public meta::MetaChangedL * @param spaceId * @param isListener Whether the space is a listener */ - void onSpaceRemoved(GraphSpaceID spaceId, bool isListener) override; + void onSpaceRemoved(GraphSpaceID spaceId) override; /** * @brief Found space option updated, call handler's methos @@ -485,26 +500,44 @@ class MetaServerBasedPartManager : public PartManager, public meta::MetaChangedL void fetchDiskParts(SpaceDiskPartsMap& diskParts) override; /** - * @brief Found a new listener, call handler's method + * @brief Found a new space of listener, call handler's method + * + * @param spaceId + * @param type Listener type + */ + void onListenerSpaceAdded(GraphSpaceID spaceId, meta::cpp2::ListenerType type) override; + + /** + * @brief Found a removed space of listener, call handler's method + * + * @param spaceId + * @param type Listener type + */ + void onListenerSpaceRemoved(GraphSpaceID spaceId, meta::cpp2::ListenerType type) override; + + /** + * @brief Found a new partition of listener, call handler's method * * @param spaceId * @param partId - * @param listenerHosts Listener's peer + * @param type Listener type + * @param peers The peers of the partition */ - void onListenerAdded(GraphSpaceID spaceId, - PartitionID partId, - const meta::ListenerHosts& listenerHosts) override; + void onListenerPartAdded(GraphSpaceID spaceId, + PartitionID partId, + meta::cpp2::ListenerType type, + const std::vector& peers) override; /** - * @brief Found a removed listener, call handler's method + * @brief Found a removed partition of listener, call handler's method * * @param spaceId * @param partId * @param type Listener type */ - void onListenerRemoved(GraphSpaceID spaceId, - PartitionID partId, - meta::cpp2::ListenerType type) override; + void onListenerPartRemoved(GraphSpaceID spaceId, + PartitionID partId, + meta::cpp2::ListenerType type) override; /** * @brief Check if a parition has remote listeners, add or remove if necessary diff --git a/src/kvstore/plugins/elasticsearch/ESListener.h b/src/kvstore/plugins/elasticsearch/ESListener.h index 0b0819af695..2f82943e8f4 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.h +++ b/src/kvstore/plugins/elasticsearch/ESListener.h @@ -39,9 +39,6 @@ class ESListener : public Listener { std::shared_ptr ioPool, std::shared_ptr workers, std::shared_ptr handlers, - std::shared_ptr snapshotMan, - std::shared_ptr clientMan, - std::shared_ptr diskMan, meta::SchemaManager* schemaMan) : Listener(spaceId, partId, @@ -50,10 +47,10 @@ class ESListener : public Listener { ioPool, workers, handlers, - snapshotMan, - clientMan, - diskMan, - schemaMan) { + nullptr, + nullptr, + nullptr), + schemaMan_(schemaMan) { CHECK(!!schemaMan); lastApplyLogFile_ = std::make_unique( folly::stringPrintf("%s/last_apply_log_%d", walPath.c_str(), partId)); @@ -177,6 +174,7 @@ class ESListener : public Listener { bool writeDatum(const std::vector& items) const; private: + meta::SchemaManager* schemaMan_{nullptr}; std::unique_ptr lastApplyLogFile_{nullptr}; std::unique_ptr spaceName_{nullptr}; std::vector esClients_; diff --git a/src/kvstore/test/NebulaListenerTest.cpp b/src/kvstore/test/NebulaListenerTest.cpp index 7da68d8f467..9ef87dc40ed 100644 --- a/src/kvstore/test/NebulaListenerTest.cpp +++ b/src/kvstore/test/NebulaListenerTest.cpp @@ -40,8 +40,7 @@ class DummyListener : public Listener { const std::string& walPath, std::shared_ptr ioPool, std::shared_ptr workers, - std::shared_ptr handlers, - meta::SchemaManager* schemaMan) + std::shared_ptr handlers) : Listener(spaceId, partId, localAddr, @@ -51,8 +50,7 @@ class DummyListener : public Listener { handlers, nullptr, nullptr, - nullptr, - schemaMan) {} + nullptr) {} std::vector data() { return data_; @@ -338,8 +336,7 @@ class ListenerBasicTest : public ::testing::TestWithParamioPool_, listeners_[index]->bgWorkers_, - listeners_[index]->workers_, - nullptr); + listeners_[index]->workers_); listeners_[index]->raftService_->addPartition(dummy); std::vector raftPeers; std::transform( diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index c9701fb61d5..d2e94d402ff 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -1149,7 +1149,7 @@ TEST(NebulaStoreTest, RemoveInvalidSpaceTest) { for (auto partId = 1; partId <= 6; partId++) { store->removePart(1, partId); } - store->removeSpace(1, false); + store->removeSpace(1); EXPECT_EQ(1, store->spaces_.size()); CHECK(!boost::filesystem::exists(space1)); CHECK(boost::filesystem::exists(space2)); @@ -1159,7 +1159,7 @@ TEST(NebulaStoreTest, RemoveInvalidSpaceTest) { for (auto partId = 1; partId <= 6; partId++) { store->removePart(2, partId); } - store->removeSpace(2, false); + store->removeSpace(2); EXPECT_EQ(0, store->spaces_.size()); CHECK(!boost::filesystem::exists(space1)); CHECK(boost::filesystem::exists(space2)); diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 9e36fbb8d86..f79c47b9df7 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -1051,22 +1051,14 @@ TEST(MetaClientTest, FTServiceTest) { class TestListener : public MetaChangedListener { public: virtual ~TestListener() = default; - void onSpaceAdded(GraphSpaceID spaceId, bool isListener) override { + void onSpaceAdded(GraphSpaceID spaceId) override { LOG(INFO) << "Space " << spaceId << " added"; - if (!isListener) { - spaceNum++; - } else { - listenerSpaceNum++; - } + spaceNum++; } - void onSpaceRemoved(GraphSpaceID spaceId, bool isListener) override { + void onSpaceRemoved(GraphSpaceID spaceId) override { LOG(INFO) << "Space " << spaceId << " removed"; - if (!isListener) { - spaceNum--; - } else { - listenerSpaceNum--; - } + spaceNum--; } void onPartAdded(const PartHosts& partMeta) override { @@ -1095,18 +1087,32 @@ class TestListener : public MetaChangedListener { LOG(INFO) << "Get leader distribution!"; } - void onListenerAdded(GraphSpaceID spaceId, - PartitionID partId, - const ListenerHosts& listenerHosts) override { + void onListenerSpaceAdded(GraphSpaceID spaceId, cpp2::ListenerType type) override { + UNUSED(spaceId); + UNUSED(type); + listenerSpaceNum++; + } + + void onListenerSpaceRemoved(GraphSpaceID spaceId, cpp2::ListenerType type) override { + UNUSED(spaceId); + UNUSED(type); + listenerSpaceNum--; + } + + void onListenerPartAdded(GraphSpaceID spaceId, + PartitionID partId, + cpp2::ListenerType type, + const std::vector& peers) override { UNUSED(spaceId); UNUSED(partId); - UNUSED(listenerHosts); + UNUSED(type); + UNUSED(peers); listenerPartNum++; } - void onListenerRemoved(GraphSpaceID spaceId, - PartitionID partId, - cpp2::ListenerType type) override { + void onListenerPartRemoved(GraphSpaceID spaceId, + PartitionID partId, + cpp2::ListenerType type) override { UNUSED(spaceId); UNUSED(partId); UNUSED(type);