Skip to content

Commit

Permalink
Check active zone before create space (vesoft-inc#3822)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
2 people authored and liwenhui-soul committed Feb 15, 2022
1 parent 3f13df6 commit 3ebe0cd
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 19 deletions.
5 changes: 1 addition & 4 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ nebula::cpp2::ErrorCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv,
}
}
// indicate whether any leader info is updated
bool hasUpdate = false;
if (!data.empty()) {
hasUpdate = true;
}
bool hasUpdate = !data.empty();
data.emplace_back(MetaKeyUtils::hostKey(hostAddr.host, hostAddr.port), HostInfo::encodeV2(info));

folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
Expand Down
12 changes: 10 additions & 2 deletions src/meta/processors/parts/CreateSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
return;
}

int32_t activeZoneSize = 0;
std::unordered_map<std::string, Hosts> zoneHosts;
for (auto& zone : zones) {
auto zoneKey = MetaKeyUtils::zoneKey(zone);
Expand All @@ -194,14 +195,14 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
auto key = MetaKeyUtils::hostKey(host.host, host.port);
auto ret = doGet(key);
if (!nebula::ok(ret)) {
code = nebula::error(ret);
LOG(ERROR) << "Get host " << host << " failed.";
break;
continue;
}

HostInfo info = HostInfo::decode(nebula::value(ret));
if (now - info.lastHBTimeInMilliSec_ <
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000) {
activeZoneSize += 1;
auto hostIter = hostLoading_.find(host);
if (hostIter == hostLoading_.end()) {
hostLoading_[host] = 0;
Expand All @@ -218,6 +219,13 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
zoneHosts[zone] = std::move(hosts);
}

if (replicaFactor > activeZoneSize) {
LOG(ERROR) << "Replication number should less than or equal to active zone number.";
handleErrorCode(nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH);
onFinished();
return;
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create space failed";
handleErrorCode(code);
Expand Down
65 changes: 52 additions & 13 deletions src/meta/test/ProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,33 @@ TEST(ProcessorTest, SpaceTest) {
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
{
std::vector<HostAddr> hosts = {{"0", 0}, {"1", 1}, {"2", 2}, {"3", 3}};
TestUtils::registerHB(kv.get(), hosts);
// Attempt to register heartbeat
const ClusterID kClusterId = 10;
for (auto i = 0; i < 4; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr(std::to_string(i), i);
req.cluster_id_ref() = kClusterId;
auto* processor = HBProcessor::instance(kv.get(), nullptr, kClusterId);
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
}
{
cpp2::ListZonesReq req;
auto* processor = ListZonesProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
auto zones = resp.get_zones();
ASSERT_EQ(4, zones.size());
ASSERT_EQ("default_zone_0_0", zones[0].get_zone_name());
ASSERT_EQ("default_zone_1_1", zones[1].get_zone_name());
ASSERT_EQ("default_zone_2_2", zones[2].get_zone_name());
ASSERT_EQ("default_zone_3_3", zones[3].get_zone_name());
}
int32_t hostsNum = 4;
{
Expand Down Expand Up @@ -473,6 +498,31 @@ TEST(ProcessorTest, SpaceTest) {
auto dresp = std::move(df).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, dresp.get_code());
}
{
cpp2::AddHostsReq req;
std::vector<HostAddr> hosts = {{"4", 4}};
req.hosts_ref() = std::move(hosts);
auto* processor = AddHostsProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
{
cpp2::SpaceDesc properties;
properties.space_name_ref() = "default_space";
properties.partition_num_ref() = 8;
properties.replica_factor_ref() = 5;
properties.charset_name_ref() = "utf8";
properties.collate_name_ref() = "utf8_bin";
cpp2::CreateSpaceReq req;
req.properties_ref() = std::move(properties);
auto* processor = CreateSpaceProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH, resp.get_code());
}
}

TEST(ProcessorTest, CreateTagTest) {
Expand Down Expand Up @@ -2569,7 +2619,6 @@ TEST(ProcessorTest, HostsTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -2610,7 +2659,6 @@ TEST(ProcessorTest, HostsTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -2749,7 +2797,6 @@ TEST(ProcessorTest, HostsTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand All @@ -2770,7 +2817,6 @@ TEST(ProcessorTest, AddHostsIntoNewZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -2824,7 +2870,6 @@ TEST(ProcessorTest, AddHostsIntoNewZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -2871,7 +2916,6 @@ TEST(ProcessorTest, AddHostsIntoZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -3028,7 +3072,6 @@ TEST(ProcessorTest, AddHostsIntoZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand All @@ -3049,7 +3092,6 @@ TEST(ProcessorTest, DropHostsTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -3080,7 +3122,6 @@ TEST(ProcessorTest, DropHostsTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -3446,7 +3487,6 @@ TEST(ProcessorTest, RenameZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -3568,7 +3608,6 @@ TEST(ProcessorTest, MergeZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8986; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down

0 comments on commit 3ebe0cd

Please sign in to comment.