From caf08156eb926c8b5b2b837a12fadb0f2cfa7abb Mon Sep 17 00:00:00 2001 From: Sophie <84560950+Sophie-Xie@users.noreply.github.com> Date: Wed, 26 Jan 2022 16:21:39 +0800 Subject: [PATCH] Cherry pick v3.0.0 (0117-0124) (#3820) * drop host only when it registe heartbeat (#3739) * Fix race of create tag and edge (#3735) * fix rece * polish * fix review * Add version for `show hosts` (#3702) * version * tck * pytest * pytest Co-authored-by: cpw <13495049+CPWstatic@users.noreply.github.com> * Update client version to v3.0.0 (#3741) * Update python package name to v3 * Update client version white list * Update go mod to v3 * Change the python package name used in tests * Update client version white list * Fix create fulltext index failed (#3747) * fix bug: the same tagId/edgetype under different spaces, failed to create fulltext indexes * fix bug: the same tagId/edgetype under different spaces, failed to create fulltext indexes * fix writternBy delete by optimizer (#3746) * fix bugs * add tck features * style Co-authored-by: cpw <13495049+CPWstatic@users.noreply.github.com> * fix with & optimizer rule error (#3736) * fix error * fix collapseProjectRule Co-authored-by: cpw <13495049+CPWstatic@users.noreply.github.com> * fix upgrade bug (#3772) Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> * fix 3727 (#3737) Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> * Balance fix (#3668) * fix core balance in zone removing the only host in zone * fix balance plan that make redundant part * fix deadlock when runInternal and finish callback called in the same thread * fix transleader return TERM_OUT_OF_DATE * continue running balance task from last status * fix deadlock when removing part Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> * Add snapshot for prefix iterate. (#3717) * Add snapshot for prefix iterate. * Add header comment * Add happy test * Fix lint * Modify engine snap argument position * Fix other subclass * Fix code Co-authored-by: yaphet <4414314+darionyaphet@users.noreply.github.com> Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> * suppress undesired ASAN error (#3705) * suppress undesired error * fix unstable test case * fix upgrade (#3787) Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> * Create CODEOWNERS (#3770) Confirm with sophie.xie * disable data balance (#3756) Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> * Fix iterator with snapshot(parameter position) (#3785) Co-authored-by: Harris.Chu <1726587+HarrisChu@users.noreply.github.com> Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> Co-authored-by: yaphet <4414314+darionyaphet@users.noreply.github.com> Co-authored-by: jakevin <30525741+jackwener@users.noreply.github.com> Co-authored-by: cpw <13495049+CPWstatic@users.noreply.github.com> Co-authored-by: Yichen Wang <18348405+Aiee@users.noreply.github.com> Co-authored-by: panda-sheep <59197347+panda-sheep@users.noreply.github.com> Co-authored-by: endy.li <25311962+heroicNeZha@users.noreply.github.com> Co-authored-by: jimingquan Co-authored-by: hs.zhang <22708345+cangfengzhs@users.noreply.github.com> Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> Co-authored-by: liwenhui-soul <38217397+liwenhui-soul@users.noreply.github.com> Co-authored-by: Alex Xing <90179377+SuperYoko@users.noreply.github.com> Co-authored-by: Harris.Chu <1726587+HarrisChu@users.noreply.github.com> --- .github/CODEOWNERS | 6 ++ cmake/ThriftGenerate.cmake | 2 +- src/codec/RowReaderV2.h | 2 +- src/common/base/SanitizerOptions.cpp | 4 +- src/common/expression/PropertyExpression.h | 4 + .../executor/admin/ShowHostsExecutor.cpp | 6 +- src/graph/optimizer/OptGroup.cpp | 1 + src/graph/planner/match/MatchPlanner.h | 2 +- src/graph/planner/plan/PlanNode.cpp | 9 ++ src/graph/planner/plan/PlanNode.h | 2 + src/graph/service/GraphFlags.cpp | 2 +- .../service/test/StandAloneTestGraphFlags.cpp | 2 +- src/graph/util/ExpressionUtils.cpp | 24 ++++-- src/graph/util/ExpressionUtils.h | 5 +- src/graph/validator/AdminJobValidator.cpp | 4 + src/graph/validator/MatchValidator.cpp | 11 ++- src/graph/visitor/RewriteVisitor.cpp | 12 +++ src/graph/visitor/RewriteVisitor.h | 2 +- src/interface/common.thrift | 4 +- src/interface/graph.thrift | 2 +- src/interface/meta.thrift | 2 +- src/interface/storage.thrift | 2 +- src/kvstore/KVEngine.h | 24 +++++- src/kvstore/KVStore.h | 50 ++++++++++- src/kvstore/NebulaSnapshotManager.cpp | 20 ++++- src/kvstore/NebulaSnapshotManager.h | 1 + src/kvstore/NebulaStore.cpp | 30 ++++++- src/kvstore/NebulaStore.h | 26 +++++- src/kvstore/PartManager.h | 1 + src/kvstore/RocksEngine.cpp | 16 +++- src/kvstore/RocksEngine.h | 13 ++- src/kvstore/plugins/hbase/HBaseStore.cpp | 4 +- src/kvstore/plugins/hbase/HBaseStore.h | 20 ++++- src/kvstore/raftex/RaftPart.cpp | 57 +++++++------ src/kvstore/raftex/RaftexService.cpp | 16 +++- src/kvstore/test/NebulaStoreTest.cpp | 72 ++++++++++++++++ src/meta/MetaVersionMan.cpp | 4 +- src/meta/processors/Common.h | 3 +- .../admin/VerifyClientVersionProcessor.cpp | 2 +- .../processors/index/FTIndexProcessor.cpp | 6 +- .../processors/job/BalanceJobExecutor.cpp | 30 +++++++ src/meta/processors/job/BalanceJobExecutor.h | 3 + src/meta/processors/job/BalancePlan.cpp | 1 - src/meta/processors/job/BalancePlan.h | 5 ++ src/meta/processors/job/BalanceTask.h | 21 +++++ .../processors/job/DataBalanceJobExecutor.cpp | 58 +++++++------ src/meta/processors/job/JobManager.cpp | 6 +- src/meta/processors/job/JobManager.h | 4 +- .../processors/job/ZoneBalanceJobExecutor.cpp | 29 ++++--- .../processors/job/ZoneBalanceJobExecutor.h | 5 +- .../parts/CreateSpaceAsProcessor.cpp | 6 +- .../processors/schema/AlterEdgeProcessor.cpp | 4 +- .../processors/schema/AlterTagProcessor.cpp | 4 +- .../processors/schema/CreateEdgeProcessor.cpp | 38 ++++----- .../processors/schema/CreateTagProcessor.cpp | 39 ++++----- .../processors/schema/DropEdgeProcessor.cpp | 4 +- .../processors/schema/DropTagProcessor.cpp | 4 +- .../processors/schema/GetEdgeProcessor.cpp | 4 +- .../processors/schema/GetTagProcessor.cpp | 4 +- .../processors/schema/ListEdgesProcessor.cpp | 2 +- .../processors/schema/ListTagsProcessor.cpp | 2 +- .../processors/zone/DivideZoneProcessor.cpp | 26 +++++- .../processors/zone/DropHostsProcessor.cpp | 5 +- src/meta/test/BalancerTest.cpp | 75 ++++++++++++++++- src/meta/test/IndexProcessorTest.cpp | 70 +++++++++++++++- src/meta/test/MetaClientTest.cpp | 50 ++++++++++- src/meta/test/ProcessorTest.cpp | 83 ++++++++++++++++++- src/meta/test/TestUtils.h | 45 +++++++++- src/storage/test/IndexTestUtil.h | 19 ++++- .../test/StorageHttpPropertyHandlerTest.cpp | 18 +--- src/tools/db-upgrade/DbUpgrader.cpp | 8 +- src/tools/db-upgrade/NebulaKeyUtilsV3.cpp | 2 +- tests/admin/test_permission.py | 2 +- tests/admin/test_show_hosts.py | 6 +- tests/common/comparator.py | 2 +- tests/common/dataset_printer.py | 2 +- tests/common/nebula_service.py | 4 +- tests/common/nebula_test_suite.py | 8 +- tests/common/path_value.py | 2 +- tests/common/utils.py | 8 +- tests/conftest.py | 10 +-- tests/job/test_session.py | 14 ++-- tests/query/stateless/test_admin.py | 2 + tests/query/stateless/test_if_exists.py | 2 +- tests/query/stateless/test_keyword.py | 2 +- tests/query/stateless/test_range.py | 2 +- tests/tck/conftest.py | 10 +-- tests/tck/features/admin/Hosts.feature | 4 +- .../features/match/MultiQueryParts.feature | 11 +++ tests/tck/features/match/With.feature | 49 +++++++++++ .../optimizer/CollapseProjectRule.feature | 17 ++++ tests/tck/features/parser/Example.feature | 4 +- tests/tck/steps/conftest.py | 2 +- tests/tck/utils/nbv.py | 2 +- tests/tck/utils/table.py | 2 +- 95 files changed, 1048 insertions(+), 268 deletions(-) create mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 00000000000..637adf895a1 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,6 @@ +#Require an approved review in PRs including files with a designated code owner. +/conf/ @vesoft-inc/tech-committee-reviewers +/src/kvstore/raftex/ @critical27 @sherman-the-tank +/cmake/ @sherman-the-tank @yixinglu +*.thrift @vesoft-inc/tech-committee-reviewers +*.yy @CPWstatic diff --git a/cmake/ThriftGenerate.cmake b/cmake/ThriftGenerate.cmake index d82af8ef678..353a33c403b 100644 --- a/cmake/ThriftGenerate.cmake +++ b/cmake/ThriftGenerate.cmake @@ -86,7 +86,7 @@ add_custom_command( --gen "js:node:" --gen "csharp" --gen "java:hashcode" - --gen "go:thrift_import=github.com/facebook/fbthrift/thrift/lib/go/thrift,package_prefix=github.com/vesoft-inc/nebula-go/v2/,use_context" + --gen "go:thrift_import=github.com/facebook/fbthrift/thrift/lib/go/thrift,package_prefix=github.com/vesoft-inc/nebula-go/v3/,use_context" -o "." "${file_path}/${file_name}.thrift" COMMAND mkdir -p "./gen-rust/${file_name}" diff --git a/src/codec/RowReaderV2.h b/src/codec/RowReaderV2.h index 1785ad8dab9..9bad0bf7f05 100644 --- a/src/codec/RowReaderV2.h +++ b/src/codec/RowReaderV2.h @@ -26,7 +26,7 @@ class RowReaderV2 : public RowReader { FRIEND_TEST(ScanEdgePropBench, ProcessEdgeProps); public: - virtual ~RowReaderV2() = default; + ~RowReaderV2() override = default; Value getValueByName(const std::string& prop) const noexcept override; Value getValueByIndex(const int64_t index) const noexcept override; diff --git a/src/common/base/SanitizerOptions.cpp b/src/common/base/SanitizerOptions.cpp index 23720344154..7449a3909e7 100644 --- a/src/common/base/SanitizerOptions.cpp +++ b/src/common/base/SanitizerOptions.cpp @@ -21,7 +21,9 @@ const char* __asan_default_options() { "fast_unwind_on_malloc=0 \n" "detect_stack_use_after_return=1 \n" "alloc_dealloc_mismatch=1 \n" - "new_delete_type_mismatch=1 \n" + // todo(doodle): Reopen when https://github.com/vesoft-inc/nebula/issues/3690 + // addressed throughly + "new_delete_type_mismatch=0 \n" "strict_init_order=1 \n" "intercept_tls_get_addr=1 \n" "symbolize_inline_frames=1 \n" diff --git a/src/common/expression/PropertyExpression.h b/src/common/expression/PropertyExpression.h index 26cf1cee943..a9baf8d47a7 100644 --- a/src/common/expression/PropertyExpression.h +++ b/src/common/expression/PropertyExpression.h @@ -155,6 +155,10 @@ class LabelTagPropertyExpression final : public PropertyExpression { return label_; } + void setLabel(Expression* label) { + label_ = label; + } + private: LabelTagPropertyExpression(ObjectPool* pool, Expression* label = nullptr, diff --git a/src/graph/executor/admin/ShowHostsExecutor.cpp b/src/graph/executor/admin/ShowHostsExecutor.cpp index 84d4a85bb88..f02a8acabed 100644 --- a/src/graph/executor/admin/ShowHostsExecutor.cpp +++ b/src/graph/executor/admin/ShowHostsExecutor.cpp @@ -30,7 +30,8 @@ folly::Future ShowHostsExecutor::showHosts() { "Status", "Leader count", "Leader distribution", - "Partition distribution"}); + "Partition distribution", + "Version"}); std::map leaderPartsCount; std::map allPartsCount; @@ -82,6 +83,7 @@ folly::Future ShowHostsExecutor::showHosts() { r.emplace_back(leaderCount); r.emplace_back(leaders.str()); r.emplace_back(parts.str()); + r.emplace_back(host.version_ref().has_value() ? Value(*host.version_ref()) : Value()); v.emplace_back(std::move(r)); } // row loop { @@ -148,7 +150,7 @@ folly::Future ShowHostsExecutor::showHosts() { LOG(ERROR) << resp.status(); return resp.status(); } - auto value = std::move(resp).value(); + auto value = std::forward(resp).value(); if (type == meta::cpp2::ListHostType::ALLOC) { return finish(makeTraditionalResult(value)); } diff --git a/src/graph/optimizer/OptGroup.cpp b/src/graph/optimizer/OptGroup.cpp index 28337516eb8..eb73f9d4472 100644 --- a/src/graph/optimizer/OptGroup.cpp +++ b/src/graph/optimizer/OptGroup.cpp @@ -45,6 +45,7 @@ void OptGroup::addGroupNode(OptGroupNode *groupNode) { DCHECK(groupNode != nullptr); DCHECK(groupNode->group() == this); groupNodes_.emplace_back(groupNode); + groupNode->node()->updateSymbols(); } OptGroupNode *OptGroup::makeGroupNode(PlanNode *node) { diff --git a/src/graph/planner/match/MatchPlanner.h b/src/graph/planner/match/MatchPlanner.h index dfe55c43362..0017ee66db3 100644 --- a/src/graph/planner/match/MatchPlanner.h +++ b/src/graph/planner/match/MatchPlanner.h @@ -14,7 +14,7 @@ namespace graph { class MatchPlanner final : public Planner { public: static std::unique_ptr make() { - return std::unique_ptr(new MatchPlanner()); + return std::make_unique(); } static bool match(AstContext* astCtx); diff --git a/src/graph/planner/plan/PlanNode.cpp b/src/graph/planner/plan/PlanNode.cpp index abe09aaa9b1..77ceecacf8d 100644 --- a/src/graph/planner/plan/PlanNode.cpp +++ b/src/graph/planner/plan/PlanNode.cpp @@ -370,6 +370,15 @@ void PlanNode::releaseSymbols() { } } +void PlanNode::updateSymbols() { + auto symTbl = qctx_->symTable(); + for (auto out : outputVars_) { + if (out != nullptr) { + symTbl->updateWrittenBy(out->name, out->name, this); + } + } +} + std::ostream& operator<<(std::ostream& os, PlanNode::Kind kind) { os << PlanNode::toString(kind); return os; diff --git a/src/graph/planner/plan/PlanNode.h b/src/graph/planner/plan/PlanNode.h index 0d54422c0ca..4db19bdadf0 100644 --- a/src/graph/planner/plan/PlanNode.h +++ b/src/graph/planner/plan/PlanNode.h @@ -269,6 +269,8 @@ class PlanNode { void releaseSymbols(); + void updateSymbols(); + static const char* toString(Kind kind); std::string toString() const; diff --git a/src/graph/service/GraphFlags.cpp b/src/graph/service/GraphFlags.cpp index 32fd80f2c14..532174509b4 100644 --- a/src/graph/service/GraphFlags.cpp +++ b/src/graph/service/GraphFlags.cpp @@ -56,7 +56,7 @@ DEFINE_bool(enable_optimizer, false, "Whether to enable optimizer"); DEFINE_uint32(ft_request_retry_times, 3, "Retry times if fulltext request failed"); DEFINE_bool(enable_client_white_list, true, "Turn on/off the client white list."); DEFINE_string(client_white_list, - nebula::getOriginVersion() + ":2.5.0:2.5.1:2.6.0", + nebula::getOriginVersion() + ":3.0.0", "A white list for different client versions, separate with colon."); #endif diff --git a/src/graph/service/test/StandAloneTestGraphFlags.cpp b/src/graph/service/test/StandAloneTestGraphFlags.cpp index de8de4c59d5..f53f1b44453 100644 --- a/src/graph/service/test/StandAloneTestGraphFlags.cpp +++ b/src/graph/service/test/StandAloneTestGraphFlags.cpp @@ -9,5 +9,5 @@ DEFINE_uint32(ft_request_retry_times, 3, "Retry times if fulltext request failed"); DEFINE_bool(enable_client_white_list, true, "Turn on/off the client white list."); DEFINE_string(client_white_list, - nebula::getOriginVersion() + ":2.5.0:2.5.1:2.6.0", + nebula::getOriginVersion() + ":3.0.0", "A white list for different client versions, separate with colon."); diff --git a/src/graph/util/ExpressionUtils.cpp b/src/graph/util/ExpressionUtils.cpp index 415532cddd2..4f7a2bed726 100644 --- a/src/graph/util/ExpressionUtils.cpp +++ b/src/graph/util/ExpressionUtils.cpp @@ -126,18 +126,24 @@ bool ExpressionUtils::isEvaluableExpr(const Expression *expr, const QueryContext } // rewrite Attribute to LabelTagProp -Expression *ExpressionUtils::rewriteAttr2LabelTagProp(const Expression *expr) { +Expression *ExpressionUtils::rewriteAttr2LabelTagProp( + const Expression *expr, const std::unordered_map &aliasTypeMap) { ObjectPool *pool = expr->getObjPool(); - auto matcher = [](const Expression *e) -> bool { - if (e->kind() == Expression::Kind::kAttribute) { - auto attrExpr = static_cast(e); - if (attrExpr->left()->kind() == Expression::Kind::kLabelAttribute && - attrExpr->right()->kind() == Expression::Kind::kConstant) { - return true; - } + auto matcher = [&aliasTypeMap](const Expression *e) -> bool { + if (e->kind() != Expression::Kind::kAttribute) { + return false; } - return false; + auto attrExpr = static_cast(e); + if (attrExpr->left()->kind() != Expression::Kind::kLabelAttribute) { + return false; + } + auto label = static_cast(attrExpr->left())->left()->name(); + auto iter = aliasTypeMap.find(label); + if (iter == aliasTypeMap.end() || iter->second != AliasType::kNode) { + return false; + } + return true; }; auto rewriter = [pool](const Expression *e) -> Expression * { diff --git a/src/graph/util/ExpressionUtils.h b/src/graph/util/ExpressionUtils.h index f42241f348b..6f435fdf386 100644 --- a/src/graph/util/ExpressionUtils.h +++ b/src/graph/util/ExpressionUtils.h @@ -15,6 +15,7 @@ #include "common/expression/PropertyExpression.h" #include "common/expression/TypeCastingExpression.h" #include "common/expression/UnaryExpression.h" +#include "graph/context/ast/CypherAstContext.h" #include "graph/visitor/EvaluableExprVisitor.h" #include "graph/visitor/FindVisitor.h" #include "graph/visitor/RewriteVisitor.h" @@ -22,7 +23,6 @@ namespace nebula { class ObjectPool; namespace graph { - class ExpressionUtils { public: explicit ExpressionUtils(...) = delete; @@ -55,7 +55,8 @@ class ExpressionUtils { static bool isEvaluableExpr(const Expression* expr, const QueryContext* qctx = nullptr); - static Expression* rewriteAttr2LabelTagProp(const Expression* expr); + static Expression* rewriteAttr2LabelTagProp( + const Expression* expr, const std::unordered_map& aliasTypeMap); static Expression* rewriteLabelAttr2TagProp(const Expression* expr); diff --git a/src/graph/validator/AdminJobValidator.cpp b/src/graph/validator/AdminJobValidator.cpp index abb5f8382de..cd489587ab0 100644 --- a/src/graph/validator/AdminJobValidator.cpp +++ b/src/graph/validator/AdminJobValidator.cpp @@ -11,6 +11,10 @@ namespace nebula { namespace graph { Status AdminJobValidator::validateImpl() { + if (sentence_->getCmd() == meta::cpp2::AdminCmd::DATA_BALANCE || + sentence_->getCmd() == meta::cpp2::AdminCmd::ZONE_BALANCE) { + return Status::SemanticError("Data balance not support"); + } if (sentence_->getOp() == meta::cpp2::AdminJobOp::ADD) { auto cmd = sentence_->getCmd(); if (requireSpace()) { diff --git a/src/graph/validator/MatchValidator.cpp b/src/graph/validator/MatchValidator.cpp index 2f704d8e53d..b079c3e7709 100644 --- a/src/graph/validator/MatchValidator.cpp +++ b/src/graph/validator/MatchValidator.cpp @@ -273,7 +273,8 @@ Status MatchValidator::validateFilter(const Expression *filter, auto transformRes = ExpressionUtils::filterTransform(filter); NG_RETURN_IF_ERROR(transformRes); // rewrite Attribute to LabelTagProperty - whereClauseCtx.filter = ExpressionUtils::rewriteAttr2LabelTagProp(transformRes.value()); + whereClauseCtx.filter = ExpressionUtils::rewriteAttr2LabelTagProp( + transformRes.value(), whereClauseCtx.aliasesAvailable); auto typeStatus = deduceExprType(whereClauseCtx.filter); NG_RETURN_IF_ERROR(typeStatus); @@ -383,7 +384,8 @@ Status MatchValidator::validateReturn(MatchReturn *ret, ExpressionUtils::hasAny(column->expr(), {Expression::Kind::kAggregate})) { retClauseCtx.yield->hasAgg_ = true; } - column->setExpr(ExpressionUtils::rewriteAttr2LabelTagProp(column->expr())); + column->setExpr(ExpressionUtils::rewriteAttr2LabelTagProp( + column->expr(), retClauseCtx.yield->aliasesAvailable)); exprs.push_back(column->expr()); columns->addColumn(column->clone().release()); } @@ -459,7 +461,8 @@ Status MatchValidator::validateWith(const WithClause *with, } if (with->returnItems()->columns()) { for (auto *column : with->returnItems()->columns()->columns()) { - column->setExpr(ExpressionUtils::rewriteAttr2LabelTagProp(column->expr())); + column->setExpr(ExpressionUtils::rewriteAttr2LabelTagProp( + column->expr(), withClauseCtx.yield->aliasesAvailable)); columns->addColumn(column->clone().release()); } } @@ -819,7 +822,7 @@ Status MatchValidator::checkAlias( auto name = static_cast(labelExpr)->prop(); auto res = getAliasType(aliasesAvailable, name); NG_RETURN_IF_ERROR(res); - if (res.value() != AliasType::kNode) { + if (res.value() == AliasType::kEdge || res.value() == AliasType::kPath) { return Status::SemanticError("The type of `%s' should be tag", name.c_str()); } return Status::OK(); diff --git a/src/graph/visitor/RewriteVisitor.cpp b/src/graph/visitor/RewriteVisitor.cpp index 3da631f210b..da207c2dbee 100644 --- a/src/graph/visitor/RewriteVisitor.cpp +++ b/src/graph/visitor/RewriteVisitor.cpp @@ -237,6 +237,18 @@ void RewriteVisitor::visit(PathBuildExpression *expr) { } } +void RewriteVisitor::visit(LabelTagPropertyExpression *expr) { + if (!care(expr->kind())) { + return; + } + auto label = expr->label(); + if (matcher_(label)) { + expr->setLabel(rewriter_(label)); + } else { + label->accept(this); + } +} + void RewriteVisitor::visit(AttributeExpression *expr) { if (!care(expr->kind())) { return; diff --git a/src/graph/visitor/RewriteVisitor.h b/src/graph/visitor/RewriteVisitor.h index 365908ed288..85d30836495 100644 --- a/src/graph/visitor/RewriteVisitor.h +++ b/src/graph/visitor/RewriteVisitor.h @@ -67,12 +67,12 @@ class RewriteVisitor final : public ExprVisitorImpl { void visit(RelationalExpression*) override; void visit(SubscriptExpression*) override; void visit(PathBuildExpression*) override; + void visit(LabelTagPropertyExpression*) override; void visit(SubscriptRangeExpression*) override; void visit(ConstantExpression*) override {} void visit(LabelExpression*) override {} void visit(UUIDExpression*) override {} void visit(LabelAttributeExpression*) override {} - void visit(LabelTagPropertyExpression*) override {} void visit(VariableExpression*) override {} void visit(VersionedVariableExpression*) override {} void visit(TagPropertyExpression*) override {} diff --git a/src/interface/common.thrift b/src/interface/common.thrift index 3075f3bd051..84e70effdcc 100644 --- a/src/interface/common.thrift +++ b/src/interface/common.thrift @@ -10,7 +10,7 @@ namespace java com.vesoft.nebula namespace go nebula namespace js nebula namespace csharp nebula -namespace py nebula2.common +namespace py nebula3.common cpp_include "common/thrift/ThriftTypes.h" cpp_include "common/datatypes/DateOps-inl.h" @@ -34,7 +34,7 @@ cpp_include "common/datatypes/DurationOps-inl.h" * */ -const binary (cpp.type = "char const *") version = "2.6.0" +const binary (cpp.type = "char const *") version = "3.0.0" typedef i64 (cpp.type = "nebula::ClusterID") ClusterID typedef i32 (cpp.type = "nebula::GraphSpaceID") GraphSpaceID diff --git a/src/interface/graph.thrift b/src/interface/graph.thrift index bfdad9cd1e1..d8bc4156158 100644 --- a/src/interface/graph.thrift +++ b/src/interface/graph.thrift @@ -9,7 +9,7 @@ namespace java com.vesoft.nebula.graph namespace go nebula.graph namespace js nebula.graph namespace csharp nebula.graph -namespace py nebula2.graph +namespace py nebula3.graph include "common.thrift" diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index b512c94ec11..1be7a816917 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -8,7 +8,7 @@ namespace java com.vesoft.nebula.meta namespace go nebula.meta namespace js nebula.meta namespace csharp nebula.meta -namespace py nebula2.meta +namespace py nebula3.meta include "common.thrift" diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 1ff18fd1cd8..9ea758e82ec 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -9,7 +9,7 @@ namespace java com.vesoft.nebula.storage namespace go nebula.storage namespace csharp nebula.storage namespace js nebula.storage -namespace py nebula2.storage +namespace py nebula3.storage include "common.thrift" include "meta.thrift" diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index d178c056f68..dd9364a3de2 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -49,6 +49,18 @@ class KVEngine { bool sync, bool wait) = 0; + /** + * @brief Get the Snapshot from kv engine. + * + * @return const void* snapshot pointer. + */ + virtual const void* GetSnapshot() = 0; + /** + * @brief Release snapshot from kv engine. + * + * @param snapshot + */ + virtual void ReleaseSnapshot(const void* snapshot) = 0; // Read a single key virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0; @@ -62,9 +74,17 @@ class KVEngine { const std::string& end, std::unique_ptr* iter) = 0; - // Get all results with 'prefix' str as prefix. + /** + * @brief Get all results with 'prefix' str as prefix. + * + * @param prefix Prefix string. + * @param snapshot Snapshot from kv engine. nullptr means no snapshot. + * @param iter Iterator for this prefix range. + * @return nebula::cpp2::ErrorCode + */ virtual nebula::cpp2::ErrorCode prefix(const std::string& prefix, - std::unique_ptr* iter) = 0; + std::unique_ptr* iter, + const void* snapshot = nullptr) = 0; // Get all results with 'prefix' str as prefix starting form 'start' virtual nebula::cpp2::ErrorCode rangeWithPrefix(const std::string& start, diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index dcda46ae7fd..0a21920979a 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -78,6 +78,26 @@ class KVStore { return nullptr; } + /** + * @brief Get the Snapshot object + * + * @param spaceId Space id + * @param partID Partition id + * @param canReadFromFollower Flag can read from follower. + * @return const void* Snapshot. + */ + virtual const void* GetSnapshot(GraphSpaceID spaceId, + PartitionID partID, + bool canReadFromFollower = false) = 0; + /** + * @brief Release snapshot. + * + * @param spaceId Space id. + * @param partId Partition id. + * @param snapshot Snapshot to release. + */ + virtual void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) = 0; + // Read a single key virtual nebula::cpp2::ErrorCode get(GraphSpaceID spaceId, PartitionID partId, @@ -113,19 +133,41 @@ class KVStore { std::unique_ptr* iter, bool canReadFromFollower = false) = delete; - // Get all results with prefix. + /** + * @brief Get all results with prefix. + * + * @param spaceId + * @param partId + * @param prefix + * @param iter + * @param canReadFromFollower + * @param snapshot If set, read from snapshot. + * @return nebula::cpp2::ErrorCode + */ virtual nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) = 0; + bool canReadFromFollower = false, + const void* snapshot = nullptr) = 0; - // To forbid to pass rvalue via the `prefix' parameter. + /** + * @brief To forbid to pass rvalue via the `prefix' parameter. + * + * @param spaceId + * @param partId + * @param prefix + * @param iter + * @param canReadFromFollower + * @param snapshot + * @return nebula::cpp2::ErrorCode + */ virtual nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId, PartitionID partId, std::string&& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) = delete; + bool canReadFromFollower = false, + const void* snapshot = nullptr) = delete; // Get all results with prefix starting from start virtual nebula::cpp2::ErrorCode rangeWithPrefix(GraphSpaceID spaceId, diff --git a/src/kvstore/NebulaSnapshotManager.cpp b/src/kvstore/NebulaSnapshotManager.cpp index 3103840a049..a59d7e8ee46 100644 --- a/src/kvstore/NebulaSnapshotManager.cpp +++ b/src/kvstore/NebulaSnapshotManager.cpp @@ -42,8 +42,23 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_batch_size); + const void* snapshot = store_->GetSnapshot(spaceId, partId); + SCOPE_EXIT { + if (snapshot != nullptr) { + store_->ReleaseSnapshot(spaceId, partId, snapshot); + } + }; + for (const auto& prefix : tables) { - if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize, rateLimiter.get())) { + if (!accessTable(spaceId, + partId, + snapshot, + prefix, + cb, + data, + totalCount, + totalSize, + rateLimiter.get())) { return; } } @@ -54,6 +69,7 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, // peers. If send failed, will return false. bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, PartitionID partId, + const void* snapshot, const std::string& prefix, raftex::SnapshotCallback& cb, std::vector& data, @@ -61,7 +77,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, int64_t& totalSize, kvstore::RateLimiter* rateLimiter) { std::unique_ptr iter; - auto ret = store_->prefix(spaceId, partId, prefix, &iter); + auto ret = store_->prefix(spaceId, partId, prefix, &iter, false, snapshot); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed" << ", error code:" << static_cast(ret); diff --git a/src/kvstore/NebulaSnapshotManager.h b/src/kvstore/NebulaSnapshotManager.h index 0fa0b4933a7..8dca397619b 100644 --- a/src/kvstore/NebulaSnapshotManager.h +++ b/src/kvstore/NebulaSnapshotManager.h @@ -26,6 +26,7 @@ class NebulaSnapshotManager : public raftex::SnapshotManager { private: bool accessTable(GraphSpaceID spaceId, PartitionID partId, + const void* snapshot, const std::string& prefix, raftex::SnapshotCallback& cb, std::vector& data, diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index b6be72e3209..43623e22ef0 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -589,6 +589,31 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId, return part->engine()->get(key, value); } +const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId, + PartitionID partId, + bool canReadFromFollower) { + auto ret = part(spaceId, partId); + if (!ok(ret)) { + return nullptr; + } + auto part = nebula::value(ret); + if (!checkLeader(part, canReadFromFollower)) { + return nullptr; + } + return part->engine()->GetSnapshot(); +} + +void NebulaStore::ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) { + auto ret = part(spaceId, partId); + if (!ok(ret)) { + LOG(INFO) << "Failed to release snapshot for GraphSpaceID " << spaceId << " PartitionID" + << partId; + return; + } + auto part = nebula::value(ret); + return part->engine()->ReleaseSnapshot(snapshot); +} + std::pair> NebulaStore::multiGet( GraphSpaceID spaceId, PartitionID partId, @@ -634,7 +659,8 @@ nebula::cpp2::ErrorCode NebulaStore::prefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower) { + bool canReadFromFollower, + const void* snapshot) { auto ret = part(spaceId, partId); if (!ok(ret)) { return error(ret); @@ -643,7 +669,7 @@ nebula::cpp2::ErrorCode NebulaStore::prefix(GraphSpaceID spaceId, if (!checkLeader(part, canReadFromFollower)) { return nebula::cpp2::ErrorCode::E_LEADER_CHANGED; } - return part->engine()->prefix(prefix, iter); + return part->engine()->prefix(prefix, iter, snapshot); } nebula::cpp2::ErrorCode NebulaStore::rangeWithPrefix(GraphSpaceID spaceId, diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 720bc8c2ee9..985d6e52bd5 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -123,6 +123,26 @@ class NebulaStore : public KVStore, public Handler { return options_.dataPaths_; } + /** + * @brief Get the Snapshot from engine. + * + * @param spaceId + * @param partID + * @param canReadFromFollower + * @return const void* Snapshot pointer. + */ + const void* GetSnapshot(GraphSpaceID spaceId, + PartitionID partID, + bool canReadFromFollower = false) override; + /** + * @brief Release snapshot from engine. + * + * @param spaceId + * @param partId + * @param snapshot + */ + void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) override; + nebula::cpp2::ErrorCode get(GraphSpaceID spaceId, PartitionID partId, const std::string& key, @@ -157,14 +177,16 @@ class NebulaStore : public KVStore, public Handler { PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) override; + bool canReadFromFollower = false, + const void* snapshot = nullptr) override; // Delete the overloading with a rvalue `prefix' nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId, PartitionID partId, std::string&& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) override = delete; + bool canReadFromFollower = false, + const void* snapshot = nullptr) override = delete; // Get all results with prefix starting from start nebula::cpp2::ErrorCode rangeWithPrefix(GraphSpaceID spaceId, diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index a04644a068d..0f7b8b80c1c 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -110,6 +110,7 @@ class MemPartManager final : public PartManager { FRIEND_TEST(NebulaStoreTest, TransLeaderTest); FRIEND_TEST(NebulaStoreTest, CheckpointTest); FRIEND_TEST(NebulaStoreTest, ThreeCopiesCheckpointTest); + FRIEND_TEST(NebulaStoreTest, ReadSnapshotTest); FRIEND_TEST(NebulaStoreTest, AtomicOpBatchTest); FRIEND_TEST(NebulaStoreTest, RemoveInvalidSpaceTest); FRIEND_TEST(NebulaStoreTest, BackupRestoreTest); diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index fd9c1a03851..6a5ef04baf6 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -213,19 +213,24 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start, } nebula::cpp2::ErrorCode RocksEngine::prefix(const std::string& prefix, - std::unique_ptr* storageIter) { + std::unique_ptr* storageIter, + const void* snapshot) { // In fact, we don't need to check prefix.size() >= extractorLen_, which is caller's duty to make // sure the prefix bloom filter exists. But this is quite error-prone, so we do a check here. if (FLAGS_enable_rocksdb_prefix_filtering && prefix.size() >= extractorLen_) { - return prefixWithExtractor(prefix, storageIter); + return prefixWithExtractor(prefix, snapshot, storageIter); } else { - return prefixWithoutExtractor(prefix, storageIter); + return prefixWithoutExtractor(prefix, snapshot, storageIter); } } nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& prefix, + const void* snapshot, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; + if (snapshot != nullptr) { + options.snapshot = reinterpret_cast(snapshot); + } options.prefix_same_as_start = true; rocksdb::Iterator* iter = db_->NewIterator(options); if (iter) { @@ -236,8 +241,11 @@ nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& pref } nebula::cpp2::ErrorCode RocksEngine::prefixWithoutExtractor( - const std::string& prefix, std::unique_ptr* storageIter) { + const std::string& prefix, const void* snapshot, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; + if (snapshot != nullptr) { + options.snapshot = reinterpret_cast(snapshot); + } // prefix_same_as_start is false by default options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering; rocksdb::Iterator* iter = db_->NewIterator(options); diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 66bf1c358cd..16f5ba0216f 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -146,6 +146,14 @@ class RocksEngine : public KVEngine { return walPath_.c_str(); } + const void* GetSnapshot() override { + return db_->GetSnapshot(); + } + + void ReleaseSnapshot(const void* snapshot) override { + db_->ReleaseSnapshot(reinterpret_cast(snapshot)); + } + std::unique_ptr startBatchWrite() override; nebula::cpp2::ErrorCode commitBatchWrite(std::unique_ptr batch, @@ -166,16 +174,19 @@ class RocksEngine : public KVEngine { std::unique_ptr* iter) override; nebula::cpp2::ErrorCode prefix(const std::string& prefix, - std::unique_ptr* iter) override; + std::unique_ptr* iter, + const void* snapshot = nullptr) override; nebula::cpp2::ErrorCode rangeWithPrefix(const std::string& start, const std::string& prefix, std::unique_ptr* iter) override; nebula::cpp2::ErrorCode prefixWithExtractor(const std::string& prefix, + const void* snapshot, std::unique_ptr* storageIter); nebula::cpp2::ErrorCode prefixWithoutExtractor(const std::string& prefix, + const void* snapshot, std::unique_ptr* storageIter); nebula::cpp2::ErrorCode scan(std::unique_ptr* storageIter) override; diff --git a/src/kvstore/plugins/hbase/HBaseStore.cpp b/src/kvstore/plugins/hbase/HBaseStore.cpp index 78784ef57cb..5d05dee490d 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.cpp +++ b/src/kvstore/plugins/hbase/HBaseStore.cpp @@ -292,9 +292,11 @@ ResultCode HBaseStore::prefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower) { + bool canReadFromFollower, + const void* snapshot) { UNUSED(partId); UNUSED(canReadFromFollower); + UNUSED(snapshot); return this->prefix(spaceId, prefix, iter); } diff --git a/src/kvstore/plugins/hbase/HBaseStore.h b/src/kvstore/plugins/hbase/HBaseStore.h index a8d628758c0..7b085be7fb0 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.h +++ b/src/kvstore/plugins/hbase/HBaseStore.h @@ -75,6 +75,20 @@ class HBaseStore : public KVStore { return {-1, -1}; } + const void* GetSnapshot(GraphSpaceID spaceId, + PartitionID partID, + bool canReadFromFollower = false) override { + UNUSED(spaceId); + UNUSED(partID); + UNUSED(canReadFromFollower); + return nullptr; + } + void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) override { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(snapshot); + return; + } ResultCode get(GraphSpaceID spaceId, PartitionID partId, const std::string& key, @@ -110,14 +124,16 @@ class HBaseStore : public KVStore { PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) override; + bool canReadFromFollower = false, + const void* snapshot = nullptr) override; // To forbid to pass rvalue via the `prefix' parameter. ResultCode prefix(GraphSpaceID spaceId, PartitionID partId, std::string&& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) override = delete; + bool canReadFromFollower = false, + const void* snapshot = nullptr) override = delete; // Get all results with prefix starting from start ResultCode rangeWithPrefix(GraphSpaceID spaceId, diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 79bd7af85d0..4d49bf60972 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -366,11 +366,15 @@ nebula::cpp2::ErrorCode RaftPart::canAppendLogs() { nebula::cpp2::ErrorCode RaftPart::canAppendLogs(TermID termId) { DCHECK(!raftLock_.try_lock()); + nebula::cpp2::ErrorCode rc = canAppendLogs(); + if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { + return rc; + } if (UNLIKELY(term_ != termId)) { VLOG(2) << idStr_ << "Term has been updated, origin " << termId << ", new " << term_; return nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE; } - return canAppendLogs(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } void RaftPart::addLearner(const HostAddr& addr) { @@ -697,19 +701,19 @@ folly::Future RaftPart::appendLogAsync(ClusterID source // until majority accept the logs, the leadership changes, or // the partition stops VLOG(2) << idStr_ << "Calling appendLogsInternal()"; - AppendLogsIterator it(firstId, - termId, - std::move(swappedOutLogs), - [this](AtomicOp opCB) -> folly::Optional { - CHECK(opCB != nullptr); - auto opRet = opCB(); - if (!opRet.hasValue()) { - // Failed - sendingPromise_.setOneSingleValue( - nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); - } - return opRet; - }); + AppendLogsIterator it( + firstId, + termId, + std::move(swappedOutLogs), + [this](AtomicOp opCB) -> folly::Optional { + CHECK(opCB != nullptr); + auto opRet = opCB(); + if (!opRet.hasValue()) { + // Failed + sendingPromise_.setOneSingleValue(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); + } + return opRet; + }); appendLogsInternal(std::move(it), termId); return retFuture; @@ -964,19 +968,18 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, // continue to replicate the logs sendingPromise_ = std::move(cachingPromise_); cachingPromise_.reset(); - iter = AppendLogsIterator( - firstLogId, - currTerm, - std::move(logs_), - [this](AtomicOp op) -> folly::Optional { - auto opRet = op(); - if (!opRet.hasValue()) { - // Failed - sendingPromise_.setOneSingleValue( - nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); - } - return opRet; - }); + iter = AppendLogsIterator(firstLogId, + currTerm, + std::move(logs_), + [this](AtomicOp op) -> folly::Optional { + auto opRet = op(); + if (!opRet.hasValue()) { + // Failed + sendingPromise_.setOneSingleValue( + nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); + } + return opRet; + }); logs_.clear(); bufferOverFlow_ = false; } diff --git a/src/kvstore/raftex/RaftexService.cpp b/src/kvstore/raftex/RaftexService.cpp index ba7d926584f..16ffda8b6ff 100644 --- a/src/kvstore/raftex/RaftexService.cpp +++ b/src/kvstore/raftex/RaftexService.cpp @@ -155,10 +155,20 @@ void RaftexService::addPartition(std::shared_ptr part) { } void RaftexService::removePartition(std::shared_ptr part) { - folly::RWSpinLock::WriteHolder wh(partsLock_); - parts_.erase(std::make_pair(part->spaceId(), part->partitionId())); + using FType = decltype(folly::makeFuture()); + using FTValype = typename FType::value_type; // Stop the partition - part->stop(); + folly::makeFuture() + .thenValue([this, &part](FTValype) { + folly::RWSpinLock::WriteHolder wh(partsLock_); + parts_.erase(std::make_pair(part->spaceId(), part->partitionId())); + }) + // the part->stop() would wait for requestOnGoing_ in Host, and the requestOnGoing_ will + // release in response in ioThreadPool,this may cause deadlock, so doing it in another + // threadpool to avoid this condition + .via(folly::getGlobalCPUExecutor()) + .thenValue([part](FTValype) { part->stop(); }) + .wait(); } std::shared_ptr RaftexService::findPart(GraphSpaceID spaceId, PartitionID partId) { diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index 3bff012cdca..fcd96929f06 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -760,6 +760,78 @@ TEST(NebulaStoreTest, ThreeCopiesCheckpointTest) { } } +TEST(NebulaStoreTest, ReadSnapshotTest) { + auto partMan = std::make_unique(); + auto ioThreadPool = std::make_shared(4); + // space id : 1 , part id : 0 + partMan->partsMap_[1][0] = PartHosts(); + + VLOG(1) << "Total space num is " << partMan->partsMap_.size() + << ", total local partitions num is " << partMan->parts(HostAddr("", 0)).size(); + + fs::TempDir rootPath("/tmp/nebula_store_test.XXXXXX"); + std::vector paths; + paths.emplace_back(folly::stringPrintf("%s/disk1", rootPath.path())); + + KVOptions options; + options.dataPaths_ = std::move(paths); + options.partMan_ = std::move(partMan); + HostAddr local = {"", 0}; + auto store = + std::make_unique(std::move(options), ioThreadPool, local, getHandlers()); + store->init(); + sleep(FLAGS_raft_heartbeat_interval_secs); + // put kv + { + std::vector> expected, result; + auto atomic = [&]() -> std::string { + std::unique_ptr batchHolder = std::make_unique(); + for (auto i = 0; i < 20; i++) { + auto key = folly::stringPrintf("key_%d", i); + auto val = folly::stringPrintf("val_%d", i); + batchHolder->put(key.data(), val.data()); + expected.emplace_back(std::move(key), std::move(val)); + } + return encodeBatchValue(batchHolder->getBatch()); + }; + + folly::Baton baton; + auto callback = [&](nebula::cpp2::ErrorCode code) { + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }; + store->asyncAtomicOp(1, 0, atomic, callback); + baton.wait(); + + std::unique_ptr iter; + std::string prefix("key"); + const void* snapshot = store->GetSnapshot(1, 0); + SCOPE_EXIT { + store->ReleaseSnapshot(1, 0, snapshot); + }; + std::vector data; + for (auto i = 20; i < 40; i++) { + auto key = folly::stringPrintf("key_%d", i); + auto val = folly::stringPrintf("val_%d", i); + data.emplace_back(key, val); + } + folly::Baton baton2; + store->asyncMultiPut(1, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton2.post(); + }); + baton2.wait(); + auto ret = store->prefix(1, 0, prefix, &iter, false, snapshot); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret); + while (iter->valid()) { + result.emplace_back(iter->key(), iter->val()); + iter->next(); + } + std::sort(expected.begin(), expected.end()); + EXPECT_EQ(expected, result); + } +} + TEST(NebulaStoreTest, AtomicOpBatchTest) { auto partMan = std::make_unique(); auto ioThreadPool = std::make_shared(4); diff --git a/src/meta/MetaVersionMan.cpp b/src/meta/MetaVersionMan.cpp index dcfecfa4487..97a6f8c1276 100644 --- a/src/meta/MetaVersionMan.cpp +++ b/src/meta/MetaVersionMan.cpp @@ -377,8 +377,8 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVEngine* engine) { std::unique_ptr zoneIter; auto code = engine->prefix(zonePrefix, &zoneIter); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get active hosts failed"; - return Status::Error("Get hosts failed"); + LOG(ERROR) << "Get zones failed"; + return Status::Error("Get zones failed"); } while (zoneIter->valid()) { diff --git a/src/meta/processors/Common.h b/src/meta/processors/Common.h index a4a436dcda1..1c7ab8c1a1d 100644 --- a/src/meta/processors/Common.h +++ b/src/meta/processors/Common.h @@ -25,8 +25,7 @@ class LockUtils { GENERATE_LOCK(id); GENERATE_LOCK(workerId); GENERATE_LOCK(localId); - GENERATE_LOCK(tag); - GENERATE_LOCK(edge); + GENERATE_LOCK(tagAndEdge); GENERATE_LOCK(tagIndex); GENERATE_LOCK(edgeIndex); GENERATE_LOCK(service); diff --git a/src/meta/processors/admin/VerifyClientVersionProcessor.cpp b/src/meta/processors/admin/VerifyClientVersionProcessor.cpp index 3678c717a47..df00c1cdbff 100644 --- a/src/meta/processors/admin/VerifyClientVersionProcessor.cpp +++ b/src/meta/processors/admin/VerifyClientVersionProcessor.cpp @@ -9,7 +9,7 @@ DEFINE_bool(enable_client_white_list, true, "Turn on/off the client white list."); DEFINE_string(client_white_list, - nebula::getOriginVersion() + ":2.5.0:2.5.1:2.6.0", + nebula::getOriginVersion() + ":3.0.0", "A white list for different client versions, separate with colon."); namespace nebula { diff --git a/src/meta/processors/index/FTIndexProcessor.cpp b/src/meta/processors/index/FTIndexProcessor.cpp index 2df5241d791..3bf899d6280 100644 --- a/src/meta/processors/index/FTIndexProcessor.cpp +++ b/src/meta/processors/index/FTIndexProcessor.cpp @@ -15,8 +15,8 @@ void CreateFTIndexProcessor::process(const cpp2::CreateFTIndexReq& req) { const auto& index = req.get_index(); const std::string& name = req.get_fulltext_index_name(); CHECK_SPACE_ID_AND_RETURN(index.get_space_id()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto isEdge = index.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type; - folly::SharedMutex::ReadHolder rHolder(isEdge ? LockUtils::edgeLock() : LockUtils::tagLock()); auto schemaPrefix = isEdge ? MetaKeyUtils::schemaEdgePrefix( index.get_space_id(), index.get_depend_schema().get_edge_type()) : MetaKeyUtils::schemaTagPrefix( @@ -100,7 +100,9 @@ void CreateFTIndexProcessor::process(const cpp2::CreateFTIndexReq& req) { onFinished(); return; } - if (index.get_depend_schema() == indexItem.get_depend_schema()) { + // Because tagId/edgeType is the space range, judge the spaceId and schemaId + if (index.get_space_id() == indexItem.get_space_id() && + index.get_depend_schema() == indexItem.get_depend_schema()) { LOG(ERROR) << "Depends on the same schema , index : " << indexName; handleErrorCode(nebula::cpp2::ErrorCode::E_EXISTED); onFinished(); diff --git a/src/meta/processors/job/BalanceJobExecutor.cpp b/src/meta/processors/job/BalanceJobExecutor.cpp index bb52f199319..d354fff681c 100644 --- a/src/meta/processors/job/BalanceJobExecutor.cpp +++ b/src/meta/processors/job/BalanceJobExecutor.cpp @@ -88,6 +88,36 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::save(const std::string& k, const std return rc; } +void BalanceJobExecutor::insertOneTask( + const BalanceTask& task, std::map>* existTasks) { + std::vector& taskVec = existTasks->operator[](task.getPartId()); + if (taskVec.empty()) { + taskVec.emplace_back(task); + } else { + for (auto it = taskVec.begin(); it != taskVec.end(); it++) { + if (task.getDstHost() == it->getSrcHost() && task.getSrcHost() == it->getDstHost()) { + taskVec.erase(it); + return; + } else if (task.getDstHost() == it->getSrcHost()) { + BalanceTask newTask(task); + newTask.setDstHost(it->getDstHost()); + taskVec.erase(it); + insertOneTask(newTask, existTasks); + return; + } else if (task.getSrcHost() == it->getDstHost()) { + BalanceTask newTask(task); + newTask.setSrcHost(it->getSrcHost()); + taskVec.erase(it); + insertOneTask(newTask, existTasks); + return; + } else { + continue; + } + } + taskVec.emplace_back(task); + } +} + nebula::cpp2::ErrorCode SpaceInfo::loadInfo(GraphSpaceID spaceId, kvstore::KVStore* kvstore) { spaceId_ = spaceId; std::string spaceKey = MetaKeyUtils::spaceKey(spaceId); diff --git a/src/meta/processors/job/BalanceJobExecutor.h b/src/meta/processors/job/BalanceJobExecutor.h index a284c3b6485..64c9c7286b3 100644 --- a/src/meta/processors/job/BalanceJobExecutor.h +++ b/src/meta/processors/job/BalanceJobExecutor.h @@ -73,6 +73,9 @@ class BalanceJobExecutor : public MetaJobExecutor { return Status::OK(); } + void insertOneTask(const BalanceTask& task, + std::map>* existTasks); + protected: std::unique_ptr plan_; std::unique_ptr executor_; diff --git a/src/meta/processors/job/BalancePlan.cpp b/src/meta/processors/job/BalancePlan.cpp index 84ed777e57f..40678b0c528 100644 --- a/src/meta/processors/job/BalancePlan.cpp +++ b/src/meta/processors/job/BalancePlan.cpp @@ -234,7 +234,6 @@ ErrorOr> BalancePlan::getBalan if (task.ret_ == BalanceTaskResult::FAILED || task.ret_ == BalanceTaskResult::INVALID) { task.ret_ = BalanceTaskResult::IN_PROGRESS; } - task.status_ = BalanceTaskStatus::START; auto activeHostRet = ActiveHostsMan::isLived(kv, task.dst_); if (!nebula::ok(activeHostRet)) { auto retCode = nebula::error(activeHostRet); diff --git a/src/meta/processors/job/BalancePlan.h b/src/meta/processors/job/BalancePlan.h index 8aed704c9a9..902633ef161 100644 --- a/src/meta/processors/job/BalancePlan.h +++ b/src/meta/processors/job/BalancePlan.h @@ -95,6 +95,11 @@ class BalancePlan { void setFinishCallBack(std::function func); + template + void insertTask(InputIterator first, InputIterator last) { + tasks_.insert(tasks_.end(), first, last); + } + private: JobDescription jobDescription_; kvstore::KVStore* kv_ = nullptr; diff --git a/src/meta/processors/job/BalanceTask.h b/src/meta/processors/job/BalanceTask.h index b9fbc36acfd..a07d4869c2a 100644 --- a/src/meta/processors/job/BalanceTask.h +++ b/src/meta/processors/job/BalanceTask.h @@ -30,6 +30,7 @@ class BalanceTask { FRIEND_TEST(BalanceTest, TryToRecoveryTest); FRIEND_TEST(BalanceTest, RecoveryTest); FRIEND_TEST(BalanceTest, StopPlanTest); + FRIEND_TEST(BalanceTest, BalanceZonePlanComplexTest); public: BalanceTask() = default; @@ -68,6 +69,26 @@ class BalanceTask { return ret_; } + const HostAddr& getSrcHost() const { + return src_; + } + + const HostAddr& getDstHost() const { + return dst_; + } + + void setSrcHost(const HostAddr& host) { + src_ = host; + } + + void setDstHost(const HostAddr& host) { + dst_ = host; + } + + PartitionID getPartId() const { + return partId_; + } + private: std::string buildTaskId() { return folly::stringPrintf("%d, %d:%d", jobId_, spaceId_, partId_); diff --git a/src/meta/processors/job/DataBalanceJobExecutor.cpp b/src/meta/processors/job/DataBalanceJobExecutor.cpp index e2abaca40fc..e5666f94202 100644 --- a/src/meta/processors/job/DataBalanceJobExecutor.cpp +++ b/src/meta/processors/job/DataBalanceJobExecutor.cpp @@ -63,23 +63,26 @@ Status DataBalanceJobExecutor::buildBalancePlan() { return l->parts_.size() < r->parts_.size(); }); } - plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_)); + std::map> existTasks; // move parts of lost hosts to active hosts in the same zone for (auto& zoneHostEntry : lostZoneHost) { const std::string& zoneName = zoneHostEntry.first; std::vector& lostHostVec = zoneHostEntry.second; std::vector& activeVec = activeSortedHost[zoneName]; + if (activeVec.size() == 0) { + return Status::Error("zone %s has no host", zoneName.c_str()); + } for (Host* host : lostHostVec) { for (PartitionID partId : host->parts_) { Host* dstHost = activeVec.front(); dstHost->parts_.insert(partId); - plan_->addTask(BalanceTask(jobId_, - spaceInfo_.spaceId_, - partId, - host->host_, - dstHost->host_, - kvstore_, - adminClient_)); + existTasks[partId].emplace_back(jobId_, + spaceInfo_.spaceId_, + partId, + host->host_, + dstHost->host_, + kvstore_, + adminClient_); for (size_t i = 0; i < activeVec.size() - 1; i++) { if (activeVec[i]->parts_.size() > activeVec[i + 1]->parts_.size()) { std::swap(activeVec[i], activeVec[i + 1]); @@ -93,7 +96,7 @@ Status DataBalanceJobExecutor::buildBalancePlan() { } lostZoneHost.clear(); // rebalance for hosts in a zone - auto balanceHostVec = [this](std::vector& hostVec) -> std::vector { + auto balanceHostVec = [this, &existTasks](std::vector& hostVec) { size_t totalPartNum = 0; size_t avgPartNum = 0; for (Host* h : hostVec) { @@ -101,7 +104,7 @@ Status DataBalanceJobExecutor::buildBalancePlan() { } if (hostVec.empty()) { LOG(ERROR) << "rebalance error: zone has no host"; - return {}; + return; } avgPartNum = totalPartNum / hostVec.size(); size_t remainder = totalPartNum - avgPartNum * hostVec.size(); @@ -109,7 +112,6 @@ Status DataBalanceJobExecutor::buildBalancePlan() { size_t leftEnd = 0; size_t rightBegin = 0; size_t rightEnd = hostVec.size(); - std::vector tasks; for (size_t i = 0; i < hostVec.size(); i++) { if (avgPartNum <= hostVec[i]->parts_.size()) { leftEnd = i; @@ -136,13 +138,14 @@ Status DataBalanceJobExecutor::buildBalancePlan() { PartitionID partId = *(srcHost->parts_.begin()); hostVec[leftBegin]->parts_.insert(partId); srcHost->parts_.erase(partId); - tasks.emplace_back(jobId_, - spaceInfo_.spaceId_, - partId, - srcHost->host_, - hostVec[leftBegin]->host_, - kvstore_, - adminClient_); + insertOneTask(BalanceTask(jobId_, + spaceInfo_.spaceId_, + partId, + srcHost->host_, + hostVec[leftBegin]->host_, + kvstore_, + adminClient_), + &existTasks); size_t leftIndex = leftBegin; for (; leftIndex < leftEnd - 1; leftIndex++) { if (hostVec[leftIndex]->parts_.size() > hostVec[leftIndex + 1]->parts_.size()) { @@ -158,18 +161,25 @@ Status DataBalanceJobExecutor::buildBalancePlan() { leftEnd = rightBegin; } } - return tasks; }; for (auto& pair : activeSortedHost) { std::vector& hvec = pair.second; - std::vector tasks = balanceHostVec(hvec); - for (BalanceTask& task : tasks) { - plan_->addTask(std::move(task)); - } + balanceHostVec(hvec); } - if (plan_->tasks().empty()) { + bool emty = std::find_if(existTasks.begin(), + existTasks.end(), + [](std::pair>& p) { + return !p.second.empty(); + }) == existTasks.end(); + if (emty) { return Status::Balanced(); } + plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_)); + std::for_each(existTasks.begin(), + existTasks.end(), + [this](std::pair>& p) { + plan_->insertTask(p.second.begin(), p.second.end()); + }); nebula::cpp2::ErrorCode rc = plan_->saveInStore(); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Error("save balance zone plan failed"); diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 795ade3c73c..33753d71575 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -144,7 +144,7 @@ void JobManager::scheduleThread() { // @return: true if all task dispatched, else false bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { - std::lock_guard lk(muJobFinished_); + std::lock_guard lk(muJobFinished_); std::unique_ptr je = JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_); JobExecutor* jobExec = je.get(); @@ -174,7 +174,7 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { if (jobExec->isMetaJob()) { jobExec->setFinishCallBack([this, jobDesc](meta::cpp2::JobStatus status) { if (status == meta::cpp2::JobStatus::STOPPED) { - std::lock_guard lkg(muJobFinished_); + std::lock_guard lkg(muJobFinished_); cleanJob(jobDesc.getJobId()); return nebula::cpp2::ErrorCode::SUCCEEDED; } else { @@ -206,7 +206,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(JobID jobId, cpp2::JobStatus job LOG(INFO) << folly::sformat( "{}, jobId={}, result={}", __func__, jobId, apache::thrift::util::enumNameSafe(jobStatus)); // normal job finish may race to job stop - std::lock_guard lk(muJobFinished_); + std::lock_guard lk(muJobFinished_); auto optJobDescRet = JobDescription::loadJobDescription(jobId, kvStore_); if (!nebula::ok(optJobDescRet)) { LOG(WARNING) << folly::sformat("can't load job, jobId={}", jobId); diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 52e73584058..f8532b61dfa 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -158,7 +158,9 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab AdminClient* adminClient_{nullptr}; std::mutex muReportFinish_; - std::mutex muJobFinished_; + // The reason of using recursive_mutex is that, it's possible for a meta job try to get this lock + // in finish-callback in the same thread with runJobInternal + std::recursive_mutex muJobFinished_; std::atomic status_ = JbmgrStatus::NOT_START; }; diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp index a152bd02d75..14a514e2dd6 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp @@ -113,7 +113,7 @@ HostAddr ZoneBalanceJobExecutor::insertPartIntoZone( nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones( std::vector* sortedActiveZones, std::map>* sortedZoneHosts, - std::vector* tasks) { + std::map>* existTasks) { std::vector& sortedActiveZonesRef = *sortedActiveZones; std::map>& sortedZoneHostsRef = *sortedZoneHosts; int32_t totalPartNum = 0; @@ -147,8 +147,9 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones( for (int32_t leftIndex = leftBegin; leftIndex < leftEnd; leftIndex++) { if (!sortedActiveZonesRef[leftIndex]->partExist(partId)) { HostAddr dst = insertPartIntoZone(sortedZoneHosts, sortedActiveZonesRef[leftIndex], partId); - tasks->emplace_back( - jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_); + insertOneTask( + BalanceTask(jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_), + existTasks); int32_t newLeftIndex = leftIndex; for (; newLeftIndex < leftEnd - 1; newLeftIndex++) { if (sortedActiveZonesRef[newLeftIndex]->partNum_ > @@ -242,7 +243,6 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { if (activeSize < spaceInfo_.replica_) { return Status::Error("Not enough alive zones to hold replica"); } - std::vector tasks; std::vector sortedActiveZones; sortedActiveZones.reserve(activeZones.size()); @@ -285,6 +285,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { return ha; }; + std::map> existTasks; // move parts of lost zones to active zones for (auto& zoneMapEntry : lostZones) { Zone* zone = zoneMapEntry.second; @@ -293,7 +294,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { Host& host = hostMapEntry.second; for (PartitionID partId : host.parts_) { HostAddr dst = chooseZoneToInsert(partId); - tasks.emplace_back( + existTasks[partId].emplace_back( jobId_, spaceInfo_.spaceId_, partId, hostAddr, dst, kvstore_, adminClient_); } host.parts_.clear(); @@ -302,15 +303,23 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { } // all parts of lost zones have moved to active zones, then rebalance the active zones - nebula::cpp2::ErrorCode rc = rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &tasks); + nebula::cpp2::ErrorCode rc = + rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &existTasks); - if (tasks.empty() || rc != nebula::cpp2::ErrorCode::SUCCEEDED) { + bool emty = std::find_if(existTasks.begin(), + existTasks.end(), + [](std::pair>& p) { + return !p.second.empty(); + }) == existTasks.end(); + if (emty || rc != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Balanced(); } plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_)); - for (BalanceTask& task : tasks) { - plan_->addTask(std::move(task)); - } + std::for_each(existTasks.begin(), + existTasks.end(), + [this](std::pair>& p) { + plan_->insertTask(p.second.begin(), p.second.end()); + }); rc = plan_->saveInStore(); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Error("save balance zone plan failed"); diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.h b/src/meta/processors/job/ZoneBalanceJobExecutor.h index 798675191b5..e264e7b822a 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.h +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.h @@ -17,6 +17,8 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor { FRIEND_TEST(BalanceTest, BalanceZoneRemainderPlanTest); FRIEND_TEST(BalanceTest, NormalZoneTest); FRIEND_TEST(BalanceTest, StopPlanTest); + FRIEND_TEST(BalanceTest, BalanceZonePlanComplexTest); + FRIEND_TEST(BalanceTest, NormalZoneComplexTest); public: ZoneBalanceJobExecutor(JobDescription jobDescription, @@ -25,6 +27,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor { const std::vector& params) : BalanceJobExecutor(jobDescription.getJobId(), kvstore, adminClient, params), jobDescription_(jobDescription) {} + nebula::cpp2::ErrorCode prepare() override; nebula::cpp2::ErrorCode stop() override; @@ -38,7 +41,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor { nebula::cpp2::ErrorCode rebalanceActiveZones( std::vector* sortedActiveZones, std::map>* sortedZoneHosts, - std::vector* tasks); + std::map>* existTasks); private: std::vector lostZones_; diff --git a/src/meta/processors/parts/CreateSpaceAsProcessor.cpp b/src/meta/processors/parts/CreateSpaceAsProcessor.cpp index b61ab6fd7c7..5a7f1dd03f4 100644 --- a/src/meta/processors/parts/CreateSpaceAsProcessor.cpp +++ b/src/meta/processors/parts/CreateSpaceAsProcessor.cpp @@ -132,7 +132,7 @@ ErrorOr> CreateSpaceAsProcesso ErrorOr> CreateSpaceAsProcessor::makeNewTags( GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::schemaTagsPrefix(oldSpaceId); auto tagPrefix = doPrefix(prefix); if (!nebula::ok(tagPrefix)) { @@ -164,7 +164,7 @@ ErrorOr> CreateSpaceAsProcesso ErrorOr> CreateSpaceAsProcessor::makeNewEdges( GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::schemaEdgesPrefix(oldSpaceId); auto edgePrefix = doPrefix(prefix); if (!nebula::ok(edgePrefix)) { @@ -196,7 +196,7 @@ ErrorOr> CreateSpaceAsProcesso ErrorOr> CreateSpaceAsProcessor::makeNewIndexes( GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::indexPrefix(oldSpaceId); auto indexPrefix = doPrefix(prefix); if (!nebula::ok(indexPrefix)) { diff --git a/src/meta/processors/schema/AlterEdgeProcessor.cpp b/src/meta/processors/schema/AlterEdgeProcessor.cpp index e2a552f1fdf..c2b6031cf53 100644 --- a/src/meta/processors/schema/AlterEdgeProcessor.cpp +++ b/src/meta/processors/schema/AlterEdgeProcessor.cpp @@ -13,10 +13,10 @@ namespace meta { void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto edgeName = req.get_edge_name(); + const auto& edgeName = req.get_edge_name(); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock()); + folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); auto ret = getEdgeType(spaceId, edgeName); if (!nebula::ok(ret)) { auto retCode = nebula::error(ret); diff --git a/src/meta/processors/schema/AlterTagProcessor.cpp b/src/meta/processors/schema/AlterTagProcessor.cpp index 31f0ea41173..2284710ec92 100644 --- a/src/meta/processors/schema/AlterTagProcessor.cpp +++ b/src/meta/processors/schema/AlterTagProcessor.cpp @@ -13,10 +13,10 @@ namespace meta { void AlterTagProcessor::process(const cpp2::AlterTagReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto tagName = req.get_tag_name(); + const auto& tagName = req.get_tag_name(); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagLock()); + folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); auto ret = getTagId(spaceId, tagName); if (!nebula::ok(ret)) { auto retCode = nebula::error(ret); diff --git a/src/meta/processors/schema/CreateEdgeProcessor.cpp b/src/meta/processors/schema/CreateEdgeProcessor.cpp index 671d5ddab16..3b9a65fe7a5 100644 --- a/src/meta/processors/schema/CreateEdgeProcessor.cpp +++ b/src/meta/processors/schema/CreateEdgeProcessor.cpp @@ -13,28 +13,25 @@ namespace meta { void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto edgeName = req.get_edge_name(); - { - // if there is an tag of the same name - // TODO: there exists race condition, we should address it in the future - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock()); - auto conflictRet = getTagId(spaceId, edgeName); - if (nebula::ok(conflictRet)) { - LOG(ERROR) << "Failed to create edge `" << edgeName - << "': some tag with the same name already exists."; - resp_.id_ref() = to(nebula::value(conflictRet), EntryType::EDGE); - handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + const auto& edgeName = req.get_edge_name(); + folly::SharedMutex::WriteHolder holder(LockUtils::tagAndEdgeLock()); + // Check if the tag with same name exists + auto conflictRet = getTagId(spaceId, edgeName); + if (nebula::ok(conflictRet)) { + LOG(ERROR) << "Failed to create edge `" << edgeName + << "': some tag with the same name already exists."; + resp_.id_ref() = to(nebula::value(conflictRet), EntryType::EDGE); + handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + onFinished(); + return; + } else { + auto retCode = nebula::error(conflictRet); + if (retCode != nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND) { + LOG(ERROR) << "Failed to create edge " << edgeName << " error " + << apache::thrift::util::enumNameSafe(retCode); + handleErrorCode(retCode); onFinished(); return; - } else { - auto retCode = nebula::error(conflictRet); - if (retCode != nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND) { - LOG(ERROR) << "Failed to create edge " << edgeName << " error " - << apache::thrift::util::enumNameSafe(retCode); - handleErrorCode(retCode); - onFinished(); - return; - } } } @@ -49,7 +46,6 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) { schema.columns_ref() = std::move(columns); schema.schema_prop_ref() = req.get_schema().get_schema_prop(); - folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock()); auto ret = getEdgeType(spaceId, edgeName); if (nebula::ok(ret)) { if (req.get_if_not_exists()) { diff --git a/src/meta/processors/schema/CreateTagProcessor.cpp b/src/meta/processors/schema/CreateTagProcessor.cpp index b5f3d39124b..dd1cd80f37b 100644 --- a/src/meta/processors/schema/CreateTagProcessor.cpp +++ b/src/meta/processors/schema/CreateTagProcessor.cpp @@ -13,28 +13,26 @@ namespace meta { void CreateTagProcessor::process(const cpp2::CreateTagReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto tagName = req.get_tag_name(); - { - // if there is an edge of the same name - // TODO: there exists race condition, we should address it in the future - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock()); - auto conflictRet = getEdgeType(spaceId, tagName); - if (nebula::ok(conflictRet)) { - LOG(ERROR) << "Failed to create tag `" << tagName - << "': some edge with the same name already exists."; - resp_.id_ref() = to(nebula::value(conflictRet), EntryType::TAG); - handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + const auto& tagName = req.get_tag_name(); + folly::SharedMutex::WriteHolder holder(LockUtils::tagAndEdgeLock()); + + // Check if the edge with same name exists + auto conflictRet = getEdgeType(spaceId, tagName); + if (nebula::ok(conflictRet)) { + LOG(ERROR) << "Failed to create tag `" << tagName + << "': some edge with the same name already exists."; + resp_.id_ref() = to(nebula::value(conflictRet), EntryType::TAG); + handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + onFinished(); + return; + } else { + auto retCode = nebula::error(conflictRet); + if (retCode != nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND) { + LOG(ERROR) << "Failed to create tag " << tagName << " error " + << apache::thrift::util::enumNameSafe(retCode); + handleErrorCode(retCode); onFinished(); return; - } else { - auto retCode = nebula::error(conflictRet); - if (retCode != nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND) { - LOG(ERROR) << "Failed to create tag " << tagName << " error " - << apache::thrift::util::enumNameSafe(retCode); - handleErrorCode(retCode); - onFinished(); - return; - } } } @@ -49,7 +47,6 @@ void CreateTagProcessor::process(const cpp2::CreateTagReq& req) { schema.columns_ref() = std::move(columns); schema.schema_prop_ref() = req.get_schema().get_schema_prop(); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagLock()); auto ret = getTagId(spaceId, tagName); if (nebula::ok(ret)) { if (req.get_if_not_exists()) { diff --git a/src/meta/processors/schema/DropEdgeProcessor.cpp b/src/meta/processors/schema/DropEdgeProcessor.cpp index 1315681bbb3..ca42af470ab 100644 --- a/src/meta/processors/schema/DropEdgeProcessor.cpp +++ b/src/meta/processors/schema/DropEdgeProcessor.cpp @@ -13,8 +13,8 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) { CHECK_SPACE_ID_AND_RETURN(spaceId); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock()); - auto edgeName = req.get_edge_name(); + folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); + const auto& edgeName = req.get_edge_name(); EdgeType edgeType; auto indexKey = MetaKeyUtils::indexEdgeKey(spaceId, edgeName); diff --git a/src/meta/processors/schema/DropTagProcessor.cpp b/src/meta/processors/schema/DropTagProcessor.cpp index 08cc3ff5331..8fcf9cf74d3 100644 --- a/src/meta/processors/schema/DropTagProcessor.cpp +++ b/src/meta/processors/schema/DropTagProcessor.cpp @@ -13,8 +13,8 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) { CHECK_SPACE_ID_AND_RETURN(spaceId); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagLock()); - auto tagName = req.get_tag_name(); + folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); + const auto& tagName = req.get_tag_name(); TagID tagId; auto indexKey = MetaKeyUtils::indexTagKey(spaceId, tagName); diff --git a/src/meta/processors/schema/GetEdgeProcessor.cpp b/src/meta/processors/schema/GetEdgeProcessor.cpp index 0ae0e3a5443..b8162cdcd58 100644 --- a/src/meta/processors/schema/GetEdgeProcessor.cpp +++ b/src/meta/processors/schema/GetEdgeProcessor.cpp @@ -11,10 +11,10 @@ namespace meta { void GetEdgeProcessor::process(const cpp2::GetEdgeReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto edgeName = req.get_edge_name(); + const auto& edgeName = req.get_edge_name(); auto ver = req.get_version(); - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto edgeTypeRet = getEdgeType(spaceId, edgeName); if (!nebula::ok(edgeTypeRet)) { LOG(ERROR) << "Get edge " << edgeName << " failed."; diff --git a/src/meta/processors/schema/GetTagProcessor.cpp b/src/meta/processors/schema/GetTagProcessor.cpp index 82e726e34ad..e085d2b9d9f 100644 --- a/src/meta/processors/schema/GetTagProcessor.cpp +++ b/src/meta/processors/schema/GetTagProcessor.cpp @@ -11,10 +11,10 @@ namespace meta { void GetTagProcessor::process(const cpp2::GetTagReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto tagName = req.get_tag_name(); + const auto& tagName = req.get_tag_name(); auto ver = req.get_version(); - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto tagIdRet = getTagId(spaceId, tagName); if (!nebula::ok(tagIdRet)) { LOG(ERROR) << "Get tag " << tagName << " failed."; diff --git a/src/meta/processors/schema/ListEdgesProcessor.cpp b/src/meta/processors/schema/ListEdgesProcessor.cpp index 3e5e2691762..ebe41692dae 100644 --- a/src/meta/processors/schema/ListEdgesProcessor.cpp +++ b/src/meta/processors/schema/ListEdgesProcessor.cpp @@ -12,7 +12,7 @@ void ListEdgesProcessor::process(const cpp2::ListEdgesReq &req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::schemaEdgesPrefix(spaceId); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { diff --git a/src/meta/processors/schema/ListTagsProcessor.cpp b/src/meta/processors/schema/ListTagsProcessor.cpp index 26da7cd41d6..ee4d3eff9d7 100644 --- a/src/meta/processors/schema/ListTagsProcessor.cpp +++ b/src/meta/processors/schema/ListTagsProcessor.cpp @@ -12,7 +12,7 @@ void ListTagsProcessor::process(const cpp2::ListTagsReq &req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::schemaTagsPrefix(spaceId); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { diff --git a/src/meta/processors/zone/DivideZoneProcessor.cpp b/src/meta/processors/zone/DivideZoneProcessor.cpp index 6b43915a5ad..ec1084f81b5 100644 --- a/src/meta/processors/zone/DivideZoneProcessor.cpp +++ b/src/meta/processors/zone/DivideZoneProcessor.cpp @@ -14,6 +14,7 @@ void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { auto zoneName = req.get_zone_name(); auto zoneKey = MetaKeyUtils::zoneKey(zoneName); auto zoneValueRet = doGet(zoneKey); + // Check the source zone exist or not if (!nebula::ok(zoneValueRet)) { LOG(ERROR) << "Zone " << zoneName << " not existed error: " << apache::thrift::util::enumNameSafe(nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND); @@ -24,6 +25,23 @@ void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { auto& zoneItems = req.get_zone_items(); auto zoneHosts = MetaKeyUtils::parseZoneHosts(std::move(nebula::value(zoneValueRet))); + + // if the source zone have only one host, it should not be split + if (zoneHosts.size() == 1) { + LOG(ERROR) << "Only one host is no need to split"; + handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); + onFinished(); + return; + } + + // if the target zone list hold only one item, it should not be split + if (zoneItems.size() == 1) { + LOG(ERROR) << "Only one zone is no need to split"; + handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); + onFinished(); + return; + } + if (zoneItems.size() > zoneHosts.size()) { LOG(ERROR) << "Zone Item should not greater than hosts size"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); @@ -35,12 +53,14 @@ void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { std::unordered_set totalHosts; size_t totalHostsSize = 0; auto batchHolder = std::make_unique(); + // Remove original zone + batchHolder->remove(std::move(zoneKey)); nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED; for (auto iter = zoneItems.begin(); iter != zoneItems.end(); iter++) { auto zone = iter->first; auto hosts = iter->second; auto valueRet = doGet(MetaKeyUtils::zoneKey(zone)); - if (nebula::ok(valueRet)) { + if (nebula::ok(valueRet) && zone != zoneName) { LOG(ERROR) << "Zone " << zone << " have existed"; code = nebula::cpp2::ErrorCode::E_EXISTED; break; @@ -48,9 +68,9 @@ void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { auto it = std::find(zoneNames.begin(), zoneNames.end(), zone); if (it == zoneNames.end()) { - LOG(ERROR) << "Zone have duplicated name"; zoneNames.emplace_back(zone); } else { + LOG(ERROR) << "Zone have duplicated name " << zone; code = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } @@ -109,8 +129,6 @@ void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { return; } - // Remove original zone - batchHolder->remove(std::move(zoneKey)); code = updateSpacesZone(batchHolder.get(), zoneName, zoneNames); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { handleErrorCode(code); diff --git a/src/meta/processors/zone/DropHostsProcessor.cpp b/src/meta/processors/zone/DropHostsProcessor.cpp index f9138b7061e..c27bc9f53c6 100644 --- a/src/meta/processors/zone/DropHostsProcessor.cpp +++ b/src/meta/processors/zone/DropHostsProcessor.cpp @@ -137,10 +137,9 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { ret = hostExist(hostKey); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "The host " << host << " not existed!"; - code = nebula::cpp2::ErrorCode::E_NO_HOSTS; - break; + } else { + holder->remove(std::move(hostKey)); } - holder->remove(std::move(hostKey)); } if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 340e8f3260e..0221a639543 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -246,6 +246,45 @@ TEST(BalanceTest, BalanceZonePlanTest) { EXPECT_EQ(balancer.spaceInfo_.zones_["zone5"].partNum_, 9); } +TEST(BalanceTest, BalanceZonePlanComplexTest) { + fs::TempDir rootPath("/tmp/BalanceZoneTest.XXXXXX"); + std::unique_ptr store = MockCluster::initMetaKV(rootPath.path()); + std::pair>>> pair1( + "z1", {{{"127.0.0.1", 11}, {}}}); + std::pair>>> pair2( + "z2", {{{"127.0.0.1", 12}, {}}}); + std::pair>>> pair3( + "z3", {{{"127.0.0.1", 13}, {}}}); + std::pair>>> pair4( + "z4", {{{"127.0.0.1", 14}, {}}}); + std::pair>>> pair5( + "z5", {{{"127.0.0.1", 15}, {}}}); + for (int32_t i = 1; i <= 512; i++) { + pair1.second.front().second.push_back(i); + pair2.second.front().second.push_back(i); + pair3.second.front().second.push_back(i); + } + SpaceInfo spaceInfo = createSpaceInfo("space1", 1, 3, {pair1, pair2, pair3, pair4, pair5}); + ZoneBalanceJobExecutor balancer(JobDescription(), store.get(), nullptr, {}); + balancer.spaceInfo_ = spaceInfo; + Status status = balancer.buildBalancePlan(); + EXPECT_EQ(status, Status::OK()); + EXPECT_EQ(balancer.spaceInfo_.zones_["z1"].partNum_, 308); + EXPECT_EQ(balancer.spaceInfo_.zones_["z2"].partNum_, 307); + EXPECT_EQ(balancer.spaceInfo_.zones_["z3"].partNum_, 307); + EXPECT_EQ(balancer.spaceInfo_.zones_["z4"].partNum_, 307); + EXPECT_EQ(balancer.spaceInfo_.zones_["z5"].partNum_, 307); + balancer.lostZones_ = {"z1"}; + status = balancer.buildBalancePlan(); + EXPECT_EQ(status, Status::OK()); + EXPECT_EQ(balancer.plan_->tasks().size(), 389); + auto tasks = balancer.plan_->tasks(); + EXPECT_EQ(balancer.spaceInfo_.zones_["z2"].partNum_, 384); + EXPECT_EQ(balancer.spaceInfo_.zones_["z3"].partNum_, 384); + EXPECT_EQ(balancer.spaceInfo_.zones_["z4"].partNum_, 384); + EXPECT_EQ(balancer.spaceInfo_.zones_["z5"].partNum_, 384); +} + TEST(BalanceTest, BalanceZoneRemainderPlanTest) { fs::TempDir rootPath("/tmp/BalanceZoneTest.XXXXXX"); std::unique_ptr store = MockCluster::initMetaKV(rootPath.path()); @@ -668,6 +707,34 @@ void verifyMetaZone(kvstore::KVStore* kv, EXPECT_EQ(zoneSet, expectZones); } +void verifyZonePartNum(kvstore::KVStore* kv, + GraphSpaceID spaceId, + const std::map& zones) { + std::map hostZone; + std::map zoneNum; + std::unique_ptr iter; + for (const auto& [zone, num] : zones) { + std::string zoneKey = MetaKeyUtils::zoneKey(zone); + std::string value; + kv->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &value); + auto hosts = MetaKeyUtils::parseZoneHosts(value); + for (auto& host : hosts) { + hostZone[host] = zone; + } + zoneNum[zone] = 0; + } + const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId); + kv->prefix(kDefaultSpaceId, kDefaultPartId, partPrefix, &iter); + while (iter->valid()) { + auto hosts = MetaKeyUtils::parsePartVal(iter->val()); + for (auto& host : hosts) { + zoneNum[hostZone[host]]++; + } + iter->next(); + } + EXPECT_EQ(zoneNum, zones); +} + JobDescription makeJobDescription(kvstore::KVStore* kv, cpp2::AdminCmd cmd) { JobDescription jd(testJobId.fetch_add(1, std::memory_order_relaxed), cmd, {}); std::vector data; @@ -779,8 +846,12 @@ TEST(BalanceTest, RecoveryTest) { partCount, 6); balancer.recovery(); - verifyBalanceTask( - kv, balancer.jobId_, BalanceTaskStatus::START, BalanceTaskResult::IN_PROGRESS, partCount, 6); + verifyBalanceTask(kv, + balancer.jobId_, + BalanceTaskStatus::CATCH_UP_DATA, + BalanceTaskResult::IN_PROGRESS, + partCount, + 6); baton.reset(); balancer.setFinishCallBack([&](meta::cpp2::JobStatus) { baton.post(); diff --git a/src/meta/test/IndexProcessorTest.cpp b/src/meta/test/IndexProcessorTest.cpp index a6fdd518372..8fb05308872 100644 --- a/src/meta/test/IndexProcessorTest.cpp +++ b/src/meta/test/IndexProcessorTest.cpp @@ -1558,6 +1558,15 @@ void mockSchemas(kvstore::KVStore* kv) { schemas.emplace_back(MetaKeyUtils::schemaEdgeKey(1, edgeType, ver), MetaKeyUtils::schemaVal("test_edge", srcsch)); + // space 2 + schemas.emplace_back(MetaKeyUtils::indexTagKey(2, "test_tag"), tagIdVal); + schemas.emplace_back(MetaKeyUtils::schemaTagKey(2, tagId, ver), + MetaKeyUtils::schemaVal("test_tag", srcsch)); + + schemas.emplace_back(MetaKeyUtils::indexEdgeKey(2, "test_edge"), edgeTypeVal); + schemas.emplace_back(MetaKeyUtils::schemaEdgeKey(2, edgeType, ver), + MetaKeyUtils::schemaVal("test_edge", srcsch)); + folly::Baton baton; kv->asyncMultiPut(0, 0, std::move(schemas), [&](nebula::cpp2::ErrorCode code) { ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); @@ -1570,6 +1579,7 @@ TEST(IndexProcessorTest, CreateFTIndexTest) { fs::TempDir rootPath("/tmp/CreateFTIndexTest.XXXXXX"); auto kv = MockCluster::initMetaKV(rootPath.path()); TestUtils::assembleSpace(kv.get(), 1, 1); + TestUtils::assembleSpace(kv.get(), 2, 1, 1, 1, true); mockSchemas(kv.get()); for (auto id : {5, 6}) { // expected error. column col_fixed_string_2 is fixed_string, @@ -1627,7 +1637,7 @@ TEST(IndexProcessorTest, CreateFTIndexTest) { } else { schemaId.edge_type_ref() = 6; } - index.space_id_ref() = 2; + index.space_id_ref() = 3; index.depend_schema_ref() = std::move(schemaId); index.fields_ref() = {"col_string"}; req.fulltext_index_name_ref() = "test_ft_index"; @@ -1825,6 +1835,63 @@ TEST(IndexProcessorTest, CreateFTIndexTest) { auto resp = std::move(f).get(); ASSERT_EQ(nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND, resp.get_code()); } + + // expected success + // Different spaces, the same tag name(same tagId), create full-text indexes with different names. + { + { + for (auto i = 0; i < 2; ++i) { + cpp2::CreateFTIndexReq req; + cpp2::FTIndex index; + nebula::cpp2::SchemaID schemaId; + schemaId.tag_id_ref() = 5; + index.space_id_ref() = i + 1; + index.depend_schema_ref() = std::move(schemaId); + index.fields_ref() = {"col_string", "col_fixed_string_1"}; + req.fulltext_index_name_ref() = folly::stringPrintf("ft_tag_index_space%d", i + 1); + req.index_ref() = std::move(index); + + auto* processor = CreateFTIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + } + } + { + cpp2::ListFTIndexesReq req; + auto* processor = ListFTIndexesProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + auto indexes = resp.get_indexes(); + ASSERT_EQ(2, indexes.size()); + for (auto i = 0u; i < indexes.size(); ++i) { + auto key = folly::stringPrintf("ft_tag_index_space%u", i + 1); + auto iter = indexes.find(key); + ASSERT_NE(indexes.end(), iter); + std::vector fields = {"col_string", "col_fixed_string_1"}; + ASSERT_EQ(fields, iter->second.get_fields()); + ASSERT_EQ(i + 1, iter->second.get_space_id()); + nebula::cpp2::SchemaID schemaId; + schemaId.tag_id_ref() = 5; + ASSERT_EQ(schemaId, iter->second.get_depend_schema()); + } + } + { + for (auto i = 0; i < 2; ++i) { + cpp2::DropFTIndexReq req; + req.space_id_ref() = i + 1; + req.fulltext_index_name_ref() = folly::stringPrintf("ft_tag_index_space%d", i + 1); + auto* processor = DropFTIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + } + } + } } TEST(IndexProcessorTest, DropWithFTIndexTest) { @@ -2282,7 +2349,6 @@ TEST(ProcessorTest, IndexIdInSpaceRangeTest) { ASSERT_EQ(14, resp.get_id().get_index_id()); } } - } // namespace meta } // namespace nebula diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 3c3ff5aa3ce..d5c628100ba 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -2606,9 +2606,10 @@ TEST(MetaClientTest, DivideZoneTest) { } { std::unordered_map> zoneItems; - std::vector oneHosts = {{"127.0.0.1", 8976}, {"127.0.0.1", 8977}}; + std::vector oneHosts = {{"127.0.0.1", 8976}}; zoneItems.emplace("one_zone_1", std::move(oneHosts)); - std::vector anotherHosts = {{"127.0.0.1", 8978}, {"127.0.0.1", 8979}}; + std::vector anotherHosts = { + {"127.0.0.1", 8977}, {"127.0.0.1", 8978}, {"127.0.0.1", 8979}}; zoneItems.emplace("another_zone_1", std::move(anotherHosts)); auto result = client->divideZone("default_zone", std::move(zoneItems)).get(); EXPECT_TRUE(result.ok()); @@ -2639,6 +2640,51 @@ TEST(MetaClientTest, DivideZoneTest) { ASSERT_EQ("one_zone_1", zones[2]); ASSERT_EQ("another_zone_1", zones[3]); } + { + std::unordered_map> zoneItems; + std::vector oneHosts = {{"127.0.0.1", 8976}}; + zoneItems.emplace("one_zone_1_1", std::move(oneHosts)); + std::vector anotherHosts = {{"127.0.0.1", 8976}}; + zoneItems.emplace("one_zone_1_2", std::move(anotherHosts)); + auto result = client->divideZone("one_zone_1", std::move(zoneItems)).get(); + EXPECT_FALSE(result.ok()); + } + { + std::unordered_map> zoneItems; + std::vector hosts = {{"127.0.0.1", 8977}, {"127.0.0.1", 8978}, {"127.0.0.1", 8979}}; + zoneItems.emplace("another_zone_1_1", std::move(hosts)); + auto result = client->divideZone("another_zone_1", std::move(zoneItems)).get(); + EXPECT_FALSE(result.ok()); + } + { + std::unordered_map> zoneItems; + std::vector oneHosts = {{"127.0.0.1", 8977}}; + zoneItems.emplace("another_zone_1", std::move(oneHosts)); + std::vector anotherHosts = {{"127.0.0.1", 8978}, {"127.0.0.1", 8979}}; + zoneItems.emplace("another_zone_1_1", std::move(anotherHosts)); + auto result = client->divideZone("another_zone_1", std::move(zoneItems)).get(); + EXPECT_TRUE(result.ok()); + } + { + auto result = client->listZones().get(); + ASSERT_TRUE(result.ok()); + auto zones = result.value(); + ASSERT_EQ(5, zones.size()); + ASSERT_EQ("another_zone", zones[0].get_zone_name()); + ASSERT_EQ("another_zone_1", zones[1].get_zone_name()); + ASSERT_EQ("another_zone_1_1", zones[2].get_zone_name()); + ASSERT_EQ("one_zone", zones[3].get_zone_name()); + ASSERT_EQ("one_zone_1", zones[4].get_zone_name()); + } + { + std::unordered_map> zoneItems; + std::vector oneHosts = {{"127.0.0.1", 8977}}; + zoneItems.emplace("another_zone_1", std::move(oneHosts)); + std::vector anotherHosts = {{"127.0.0.1", 8978}, {"127.0.0.1", 8979}}; + zoneItems.emplace("another_zone_1", std::move(anotherHosts)); + auto result = client->divideZone("another_zone_1", std::move(zoneItems)).get(); + EXPECT_FALSE(result.ok()); + } cluster.stop(); } diff --git a/src/meta/test/ProcessorTest.cpp b/src/meta/test/ProcessorTest.cpp index 58bce373c42..c6028f01801 100644 --- a/src/meta/test/ProcessorTest.cpp +++ b/src/meta/test/ProcessorTest.cpp @@ -4192,7 +4192,9 @@ TEST(ProcessorTest, DivideZoneTest) { ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); ASSERT_EQ(2, resp.get_zones().size()); ASSERT_EQ("another_zone", resp.get_zones()[0].get_zone_name()); + ASSERT_EQ(2, resp.get_zones()[0].get_nodes().size()); ASSERT_EQ("one_zone", resp.get_zones()[1].get_zone_name()); + ASSERT_EQ(2, resp.get_zones()[1].get_nodes().size()); } { cpp2::AddHostsIntoZoneReq req; @@ -4275,9 +4277,10 @@ TEST(ProcessorTest, DivideZoneTest) { cpp2::DivideZoneReq req; req.zone_name_ref() = "default_zone"; std::unordered_map> zoneItems; - std::vector oneHosts = {{"127.0.0.1", 8976}, {"127.0.0.1", 8977}}; + std::vector oneHosts = {{"127.0.0.1", 8976}}; zoneItems.emplace("one_zone_1", std::move(oneHosts)); - std::vector anotherHosts = {{"127.0.0.1", 8978}, {"127.0.0.1", 8979}}; + std::vector anotherHosts = { + {"127.0.0.1", 8977}, {"127.0.0.1", 8978}, {"127.0.0.1", 8979}}; zoneItems.emplace("another_zone_1", std::move(anotherHosts)); req.zone_items_ref() = std::move(zoneItems); auto* processor = DivideZoneProcessor::instance(kv.get()); @@ -4295,9 +4298,85 @@ TEST(ProcessorTest, DivideZoneTest) { ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); ASSERT_EQ(4, resp.get_zones().size()); ASSERT_EQ("another_zone", resp.get_zones()[0].get_zone_name()); + ASSERT_EQ(2, resp.get_zones()[0].get_nodes().size()); ASSERT_EQ("another_zone_1", resp.get_zones()[1].get_zone_name()); + ASSERT_EQ(3, resp.get_zones()[1].get_nodes().size()); ASSERT_EQ("one_zone", resp.get_zones()[2].get_zone_name()); + ASSERT_EQ(2, resp.get_zones()[2].get_nodes().size()); ASSERT_EQ("one_zone_1", resp.get_zones()[3].get_zone_name()); + ASSERT_EQ(1, resp.get_zones()[3].get_nodes().size()); + } + { + cpp2::DivideZoneReq req; + req.zone_name_ref() = "one_zone_1"; + std::unordered_map> zoneItems; + std::vector oneHosts = {{"127.0.0.1", 8976}}; + zoneItems.emplace("one_zone_1_1", std::move(oneHosts)); + std::vector anotherHosts = {{"127.0.0.1", 8976}}; + zoneItems.emplace("one_zone_1_2", std::move(anotherHosts)); + req.zone_items_ref() = std::move(zoneItems); + auto* processor = DivideZoneProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_INVALID_PARM, resp.get_code()); + } + { + cpp2::DivideZoneReq req; + req.zone_name_ref() = "another_zone_1"; + std::unordered_map> zoneItems; + std::vector hosts = {{"127.0.0.1", 8977}, {"127.0.0.1", 8978}, {"127.0.0.1", 8979}}; + zoneItems.emplace("another_zone_1_1", std::move(hosts)); + req.zone_items_ref() = std::move(zoneItems); + auto* processor = DivideZoneProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_INVALID_PARM, resp.get_code()); + } + { + cpp2::DivideZoneReq req; + req.zone_name_ref() = "another_zone_1"; + std::unordered_map> zoneItems; + std::vector oneHosts = {{"127.0.0.1", 8977}}; + zoneItems.emplace("another_zone_1", std::move(oneHosts)); + std::vector anotherHosts = {{"127.0.0.1", 8978}, {"127.0.0.1", 8979}}; + zoneItems.emplace("another_zone_1_1", std::move(anotherHosts)); + req.zone_items_ref() = std::move(zoneItems); + auto* processor = DivideZoneProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + } + { + cpp2::ListZonesReq req; + auto* processor = ListZonesProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + ASSERT_EQ(5, resp.get_zones().size()); + ASSERT_EQ("another_zone", resp.get_zones()[0].get_zone_name()); + ASSERT_EQ("another_zone_1", resp.get_zones()[1].get_zone_name()); + ASSERT_EQ("another_zone_1_1", resp.get_zones()[2].get_zone_name()); + ASSERT_EQ("one_zone", resp.get_zones()[3].get_zone_name()); + ASSERT_EQ("one_zone_1", resp.get_zones()[4].get_zone_name()); + } + { + cpp2::DivideZoneReq req; + req.zone_name_ref() = "another_zone_1"; + std::unordered_map> zoneItems; + std::vector oneHosts = {{"127.0.0.1", 8977}}; + zoneItems.emplace("another_zone_1", std::move(oneHosts)); + std::vector anotherHosts = {{"127.0.0.1", 8978}, {"127.0.0.1", 8979}}; + zoneItems.emplace("another_zone_1", std::move(anotherHosts)); + req.zone_items_ref() = std::move(zoneItems); + auto* processor = DivideZoneProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_INVALID_PARM, resp.get_code()); } } diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index ba95e4bcbf7..8e08afdfd89 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -175,16 +175,26 @@ class TestUtils { GraphSpaceID id, int32_t partitionNum, int32_t replica = 1, - int32_t totalHost = 1) { + int32_t totalHost = 1, + bool multispace = false) { // mock the part distribution like create space cpp2::SpaceDesc properties; - properties.space_name_ref() = "test_space"; + if (multispace) { + properties.space_name_ref() = folly::stringPrintf("test_space_%d", id); + } else { + properties.space_name_ref() = "test_space"; + } properties.partition_num_ref() = partitionNum; properties.replica_factor_ref() = replica; auto spaceVal = MetaKeyUtils::spaceVal(properties); std::vector data; - data.emplace_back(MetaKeyUtils::indexSpaceKey("test_space"), - std::string(reinterpret_cast(&id), sizeof(GraphSpaceID))); + if (multispace) { + data.emplace_back(MetaKeyUtils::indexSpaceKey(folly::stringPrintf("test_space_%d", id)), + std::string(reinterpret_cast(&id), sizeof(GraphSpaceID))); + } else { + data.emplace_back(MetaKeyUtils::indexSpaceKey("test_space"), + std::string(reinterpret_cast(&id), sizeof(GraphSpaceID))); + } data.emplace_back(MetaKeyUtils::spaceKey(id), MetaKeyUtils::spaceVal(properties)); std::vector allHosts; @@ -208,6 +218,33 @@ class TestUtils { baton.wait(); } + static void addZoneToSpace(kvstore::KVStore* kv, + GraphSpaceID id, + const std::vector& zones) { + std::string spaceKey = MetaKeyUtils::spaceKey(id); + std::string spaceVal; + kv->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &spaceVal); + meta::cpp2::SpaceDesc properties = MetaKeyUtils::parseSpace(spaceVal); + std::vector curZones = properties.get_zone_names(); + curZones.insert(curZones.end(), zones.begin(), zones.end()); + properties.zone_names_ref() = curZones; + std::vector data; + data.emplace_back(MetaKeyUtils::spaceKey(id), MetaKeyUtils::spaceVal(properties)); + folly::Baton baton; + auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; + kv->asyncMultiPut(kDefaultSpaceId, + kDefaultPartId, + std::move(data), + [&ret, &baton](nebula::cpp2::ErrorCode code) { + if (nebula::cpp2::ErrorCode::SUCCEEDED != code) { + ret = code; + LOG(INFO) << "Put data error on meta server"; + } + baton.post(); + }); + baton.wait(); + } + static void assembleSpaceWithZone(kvstore::KVStore* kv, GraphSpaceID id, int32_t partitionNum, diff --git a/src/storage/test/IndexTestUtil.h b/src/storage/test/IndexTestUtil.h index df2f831b91c..09c5747dfa3 100644 --- a/src/storage/test/IndexTestUtil.h +++ b/src/storage/test/IndexTestUtil.h @@ -70,6 +70,21 @@ class MockKVStore : public ::nebula::kvstore::KVStore { CHECK(false); return nebula::cpp2::ErrorCode::SUCCEEDED; } + + const void* GetSnapshot(GraphSpaceID spaceId, + PartitionID partID, + bool canReadFromFollower = false) override { + UNUSED(spaceId); + UNUSED(partID); + UNUSED(canReadFromFollower); + return nullptr; + } + void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) override { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(snapshot); + return; + } // Read a single key nebula::cpp2::ErrorCode get(GraphSpaceID spaceId, PartitionID partId, @@ -154,10 +169,12 @@ class MockKVStore : public ::nebula::kvstore::KVStore { PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) override { + bool canReadFromFollower = false, + const void* snapshot = nullptr) override { UNUSED(canReadFromFollower); UNUSED(spaceId); UNUSED(partId); + UNUSED(snapshot); // Pity that mock kv don't have snap. CHECK_EQ(spaceId, spaceId_); auto mockIter = std::make_unique(kv_, kv_.lower_bound(prefix)); mockIter->setValidFunc([prefix](const decltype(kv_)::iterator& it) { diff --git a/src/storage/test/StorageHttpPropertyHandlerTest.cpp b/src/storage/test/StorageHttpPropertyHandlerTest.cpp index 5d3729eda17..4bc387f6dc7 100644 --- a/src/storage/test/StorageHttpPropertyHandlerTest.cpp +++ b/src/storage/test/StorageHttpPropertyHandlerTest.cpp @@ -77,28 +77,12 @@ TEST(StorageHttpPropertyHandlerTest, ValidRequest) { { std::string expect = R"([ - { - "Engine 0": "96", - "Engine 1": "96" - } -])"; - EXPECT_EQ(expect, request("/rocksdb_property?space=1&property=rocksdb.block-cache-usage")); - } - { - std::string expect = - R"([ - { - "Engine 0": "96", - "Engine 1": "96" - }, { "Engine 0": "0", "Engine 1": "0" } ])"; - EXPECT_EQ(expect, - request("/rocksdb_property?space=1&property=" - "rocksdb.block-cache-usage,rocksdb.is-write-stopped")); + EXPECT_EQ(expect, request("/rocksdb_property?space=1&property=rocksdb.is-write-stopped")); } } diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp index daedb8451ba..1666f24aa7a 100644 --- a/src/tools/db-upgrade/DbUpgrader.cpp +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -910,13 +910,17 @@ void UpgraderSpace::runPartV3() { } return; } + int64_t ingestFileCount = 0; auto write_sst = [&, this](const std::vector& data) { ::rocksdb::Options option; option.create_if_missing = true; option.compression = ::rocksdb::CompressionType::kNoCompression; ::rocksdb::SstFileWriter sst_file_writer(::rocksdb::EnvOptions(), option); - std::string file = ::fmt::format( - ".nebula_upgrade.space-{}.part-{}.{}.sst", spaceId_, partId, std::time(nullptr)); + std::string file = ::fmt::format(".nebula_upgrade.space-{}.part-{}-{}-{}.sst", + spaceId_, + partId, + ingestFileCount++, + std::time(nullptr)); ::rocksdb::Status s = sst_file_writer.Open(file); if (!s.ok()) { LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp index f6105094f4f..e7e7033e389 100644 --- a/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp @@ -15,7 +15,7 @@ std::string NebulaKeyUtilsV3::partTagPrefix(PartitionID partId) { } std::string NebulaKeyUtilsV3::getVertexKey(folly::StringPiece tagKey) { std::string key = tagKey.toString(); - key[3] = static_cast(kVertex); + key[0] = static_cast(kVertex); key.resize(key.size() - sizeof(TagID)); return key; } diff --git a/tests/admin/test_permission.py b/tests/admin/test_permission.py index 471e5645237..990e0972d6e 100644 --- a/tests/admin/test_permission.py +++ b/tests/admin/test_permission.py @@ -6,7 +6,7 @@ import time -from nebula2.common import ttypes +from nebula3.common import ttypes from tests.common.nebula_test_suite import NebulaTestSuite diff --git a/tests/admin/test_show_hosts.py b/tests/admin/test_show_hosts.py index 3392cb0760c..49fe6e465e7 100644 --- a/tests/admin/test_show_hosts.py +++ b/tests/admin/test_show_hosts.py @@ -26,13 +26,15 @@ def test_show_hosts(self): 'Status', 'Leader count', 'Leader distribution', - 'Partition distribution'] + 'Partition distribution', + 'Version'] expected_result_format = [[re.compile(r'\S+'), re.compile(r'\d+'), re.compile(r'ONLINE|OFFLINE'), re.compile(r'\d+'), re.compile(r'No valid partition|(\S+:\d+, )*\S+:\d+'), - re.compile(r'No valid partition|(\S+:\d+, )*\S+:\d+')]] + re.compile(r'No valid partition|(\S+:\d+, )*\S+:\d+'), + re.compile(r'(^$)|(v\d+\.\d+\.\d+)')]] resp = self.execute(query) self.check_resp_succeeded(resp) self.check_column_names(resp, expected_column_names) diff --git a/tests/common/comparator.py b/tests/common/comparator.py index b77870fe4f4..9fb42ddf85a 100644 --- a/tests/common/comparator.py +++ b/tests/common/comparator.py @@ -7,7 +7,7 @@ from enum import Enum from typing import Union, Dict, List -from nebula2.common.ttypes import ( +from nebula3.common.ttypes import ( DataSet, Edge, Path, diff --git a/tests/common/dataset_printer.py b/tests/common/dataset_printer.py index 12c09161293..3c4e5e420c8 100644 --- a/tests/common/dataset_printer.py +++ b/tests/common/dataset_printer.py @@ -6,7 +6,7 @@ from typing import List, Union -from nebula2.common.ttypes import DataSet, Edge, NullType, Path, Value, Vertex +from nebula3.common.ttypes import DataSet, Edge, NullType, Path, Value, Vertex Pattern = type(re.compile(r'\d+')) diff --git a/tests/common/nebula_service.py b/tests/common/nebula_service.py index 1958ca15086..c7d0495ad7d 100644 --- a/tests/common/nebula_service.py +++ b/tests/common/nebula_service.py @@ -20,8 +20,8 @@ from tests.common.constants import TMP_DIR from tests.common.utils import get_ssl_config -from nebula2.gclient.net import ConnectionPool -from nebula2.Config import Config +from nebula3.gclient.net import ConnectionPool +from nebula3.Config import Config NEBULA_START_COMMAND_FORMAT = "bin/nebula-{} --flagfile conf/nebula-{}.conf {}" diff --git a/tests/common/nebula_test_suite.py b/tests/common/nebula_test_suite.py index 1ff82bf2859..ce7c217ca25 100644 --- a/tests/common/nebula_test_suite.py +++ b/tests/common/nebula_test_suite.py @@ -11,10 +11,10 @@ from pathlib import Path from typing import Pattern, Set -from nebula2.common import ttypes as CommonTtypes -# from nebula2.gclient.net import ConnectionPool -# from nebula2.Config import Config -from nebula2.graph import ttypes +from nebula3.common import ttypes as CommonTtypes +# from nebula3.gclient.net import ConnectionPool +# from nebula3.Config import Config +from nebula3.graph import ttypes from tests.common.configs import get_delay_time from tests.common.utils import ( compare_value, diff --git a/tests/common/path_value.py b/tests/common/path_value.py index 68288e76274..50d1f1bc4dd 100644 --- a/tests/common/path_value.py +++ b/tests/common/path_value.py @@ -4,7 +4,7 @@ # # This source code is licensed under Apache 2.0 License. -from nebula2.common import ttypes as CommonTtypes +from nebula3.common import ttypes as CommonTtypes class PathVal: diff --git a/tests/common/utils.py b/tests/common/utils.py index 75323d0b83b..894e8ca91b3 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -13,10 +13,10 @@ import yaml from typing import Pattern -from nebula2.Config import Config, SSL_config -from nebula2.common import ttypes as CommonTtypes -from nebula2.gclient.net import Session -from nebula2.gclient.net import ConnectionPool +from nebula3.Config import Config, SSL_config +from nebula3.common import ttypes as CommonTtypes +from nebula3.gclient.net import Session +from nebula3.gclient.net import ConnectionPool from tests.common.constants import NB_TMP_PATH, NEBULA_HOME from tests.common.csv_import import CSVImporter diff --git a/tests/conftest.py b/tests/conftest.py index 4f79d1de866..c23d24a2401 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,11 +15,11 @@ from tests.common.constants import NB_TMP_PATH, SPACE_TMP_PATH, BUILD_DIR, NEBULA_HOME from tests.common.nebula_service import NebulaService -from nebula2.fbthrift.transport import TSocket -from nebula2.fbthrift.transport import TTransport -from nebula2.fbthrift.protocol import TBinaryProtocol -from nebula2.gclient.net import Connection -from nebula2.graph import GraphService +from nebula3.fbthrift.transport import TSocket +from nebula3.fbthrift.transport import TTransport +from nebula3.fbthrift.protocol import TBinaryProtocol +from nebula3.gclient.net import Connection +from nebula3.graph import GraphService tests_collected = set() diff --git a/tests/job/test_session.py b/tests/job/test_session.py index c08605fa919..5da0687c5bd 100644 --- a/tests/job/test_session.py +++ b/tests/job/test_session.py @@ -10,15 +10,15 @@ import pytest import concurrent -from nebula2.fbthrift.transport import TSocket -from nebula2.fbthrift.transport import TTransport -from nebula2.fbthrift.protocol import TBinaryProtocol +from nebula3.fbthrift.transport import TSocket +from nebula3.fbthrift.transport import TTransport +from nebula3.fbthrift.protocol import TBinaryProtocol -from nebula2.gclient.net import Connection -from nebula2.graph import GraphService -from nebula2.common import ttypes -from nebula2.data.ResultSet import ResultSet +from nebula3.gclient.net import Connection +from nebula3.graph import GraphService +from nebula3.common import ttypes +from nebula3.data.ResultSet import ResultSet from tests.common.nebula_test_suite import NebulaTestSuite class TestSession(NebulaTestSuite): diff --git a/tests/query/stateless/test_admin.py b/tests/query/stateless/test_admin.py index 82f657ba857..87f22dee05e 100644 --- a/tests/query/stateless/test_admin.py +++ b/tests/query/stateless/test_admin.py @@ -4,6 +4,7 @@ # # This source code is licensed under Apache 2.0 License. +import pytest import time from tests.common.nebula_test_suite import NebulaTestSuite @@ -22,6 +23,7 @@ def prepare(self): def cleanup(self): pass + @pytest.mark.skip(reason="The change of minloglevel will influence case in test_configs.py") def test_config(self): ''' @brief Testing about configuration query diff --git a/tests/query/stateless/test_if_exists.py b/tests/query/stateless/test_if_exists.py index ffd075d25b6..1555f5cc0a1 100644 --- a/tests/query/stateless/test_if_exists.py +++ b/tests/query/stateless/test_if_exists.py @@ -9,7 +9,7 @@ import pytest import time -from nebula2.graph import ttypes +from nebula3.graph import ttypes from tests.common.nebula_test_suite import NebulaTestSuite diff --git a/tests/query/stateless/test_keyword.py b/tests/query/stateless/test_keyword.py index 0c970615f69..ec3ed0aac44 100644 --- a/tests/query/stateless/test_keyword.py +++ b/tests/query/stateless/test_keyword.py @@ -9,7 +9,7 @@ import pytest import time -from nebula2.graph import ttypes +from nebula3.graph import ttypes from tests.common.nebula_test_suite import NebulaTestSuite class TestReservedKeyword(NebulaTestSuite): diff --git a/tests/query/stateless/test_range.py b/tests/query/stateless/test_range.py index 60085e3b20a..73ee5490736 100644 --- a/tests/query/stateless/test_range.py +++ b/tests/query/stateless/test_range.py @@ -10,7 +10,7 @@ import pytest -from nebula2.common import ttypes +from nebula3.common import ttypes from tests.common.nebula_test_suite import NebulaTestSuite diff --git a/tests/tck/conftest.py b/tests/tck/conftest.py index f2abc3e57a1..4daaf78830c 100644 --- a/tests/tck/conftest.py +++ b/tests/tck/conftest.py @@ -12,9 +12,9 @@ import threading import json -from nebula2.common.ttypes import NList, NMap, Value, ErrorCode -from nebula2.data.DataObject import ValueWrapper -from nebula2.Exception import AuthFailedException +from nebula3.common.ttypes import NList, NMap, Value, ErrorCode +from nebula3.data.DataObject import ValueWrapper +from nebula3.Exception import AuthFailedException from pytest_bdd import given, parsers, then, when from tests.common.dataset_printer import DataSetPrinter @@ -37,8 +37,8 @@ from tests.tck.utils.table import dataset, table from tests.tck.utils.nbv import murmurhash2 -from nebula2.graph.ttypes import VerifyClientVersionReq -from nebula2.graph.ttypes import VerifyClientVersionResp +from nebula3.graph.ttypes import VerifyClientVersionReq +from nebula3.graph.ttypes import VerifyClientVersionResp parse = functools.partial(parsers.parse) rparse = functools.partial(parsers.re) diff --git a/tests/tck/features/admin/Hosts.feature b/tests/tck/features/admin/Hosts.feature index 0d1b629cb31..2c473726bdd 100644 --- a/tests/tck/features/admin/Hosts.feature +++ b/tests/tck/features/admin/Hosts.feature @@ -9,8 +9,8 @@ Feature: Admin hosts SHOW HOSTS; """ Then the result should contain: - | Host | Port | Status | Leader count | Leader distribution | Partition distribution | - | /\w+/ | /\d+/ | "ONLINE" | /\d+/ | /.*/ | /.*/ | + | Host | Port | Status | Leader count | Leader distribution | Partition distribution | Version | + | /\w+/ | /\d+/ | "ONLINE" | /\d+/ | /.*/ | /.*/ | /.*/ | When executing query: """ SHOW HOSTS GRAPH; diff --git a/tests/tck/features/match/MultiQueryParts.feature b/tests/tck/features/match/MultiQueryParts.feature index 7e989932b22..07cdc177de6 100644 --- a/tests/tck/features/match/MultiQueryParts.feature +++ b/tests/tck/features/match/MultiQueryParts.feature @@ -148,6 +148,17 @@ Feature: Multi Query Parts | "Tim Duncan" | "Aron Baynes" | "Pistons" | "Grant Hill" | | "Tim Duncan" | "Aron Baynes" | "Spurs" | "Aron Baynes" | | "Tim Duncan" | "Aron Baynes" | "Spurs" | "Boris Diaw" | + When executing query: + """ + MATCH (v:player{name:"Tony Parker"}) + WITH v AS a + MATCH p=(o:player{name:"Tim Duncan"})-[]->(a) + RETURN o.player.name + """ + Then the result should be, in order: + | o.player.name | + | "Tim Duncan" | + | "Tim Duncan" | Scenario: Optional Match When executing query: diff --git a/tests/tck/features/match/With.feature b/tests/tck/features/match/With.feature index 887d6aa002e..5d8252d8956 100644 --- a/tests/tck/features/match/With.feature +++ b/tests/tck/features/match/With.feature @@ -64,6 +64,38 @@ Feature: With clause Then the result should be, in any order: | count(a) | | 1 | + When executing query: + """ + WITH {a:1, b:{c:3, d:{e:5}}} AS x + RETURN x.b.d.e + """ + Then the result should be, in any order: + | x.b.d.e | + | 5 | + When executing query: + """ + WITH {a:1, b:{c:3, d:{e:5}}} AS x + RETURN x.b.d + """ + Then the result should be, in any order: + | x.b.d | + | {e: 5} | + When executing query: + """ + WITH {a:1, b:{c:3, d:{e:5}}} AS x + RETURN x.b + """ + Then the result should be, in any order: + | x.b | + | {c: 3, d: {e: 5}} | + When executing query: + """ + WITH {a:1, b:{c:3, d:{e:5}}} AS x + RETURN x.c + """ + Then the result should be, in any order: + | x.c | + | UNKNOWN_PROP | Scenario: match with return When executing query: @@ -185,6 +217,23 @@ Feature: With clause Then the result should be, in any order, with relax comparison: | avg | max | | 90.0 | 90 | + When executing query: + """ + MATCH (:player {name:"Tim Duncan"})-[e:like]->(dst) + WITH dst AS b + RETURN b.player.age AS age, b + """ + Then the result should be, in any order, with relax comparison: + | age | b | + | 36 | ("Tony Parker" :player{age: 36, name: "Tony Parker"}) | + | 41 | ("Manu Ginobili" :player{age: 41, name: "Manu Ginobili"}) | + When executing query: + """ + MATCH (:player {name:"Tim Duncan"})-[e:like]->(dst) + WITH dst AS b + RETURN b.age AS age, b + """ + Then a SemanticError should be raised at runtime: To get the property of the vertex in `b.age', should use the format `var.tag.prop' @skip Scenario: with match return diff --git a/tests/tck/features/optimizer/CollapseProjectRule.feature b/tests/tck/features/optimizer/CollapseProjectRule.feature index ea6f339a162..46a572f6274 100644 --- a/tests/tck/features/optimizer/CollapseProjectRule.feature +++ b/tests/tck/features/optimizer/CollapseProjectRule.feature @@ -47,3 +47,20 @@ Feature: Collapse Project Rule | 6 | Project | 5 | | | 5 | TagIndexPrefixScan | 0 | | | 0 | Start | | | + When profiling query: + """ + MATCH (:player {name:"Tim Duncan"})-[e:like]->(dst) + WITH dst AS b + RETURN b.player.age AS age + """ + Then the result should be, in any order: + | age | + | 36 | + | 41 | + And the execution plan should be: + | id | name | dependencies | operator info | + | 11 | Project | 4 | | + | 4 | AppendVertices | 3 | | + | 3 | Traverse | 8 | | + | 8 | IndexScan | 2 | {"indexCtx": {"columnHints":{"scanType":"PREFIX","column":"name","beginValue":"\"Tim Duncan\"","endValue":"__EMPTY__","includeBegin":"true","includeEnd":"false"}}} | + | 2 | Start | | | diff --git a/tests/tck/features/parser/Example.feature b/tests/tck/features/parser/Example.feature index a561ea48cd2..43e4d866206 100644 --- a/tests/tck/features/parser/Example.feature +++ b/tests/tck/features/parser/Example.feature @@ -24,8 +24,8 @@ Feature: Feature examples SHOW HOSTS """ Then the result should contain: - | Host | Port | Status | Leader count | Leader distribution | Partition distribution | - | /\w+/ | /\d+/ | "ONLINE" | /\d+/ | /.*/ | /.*/ | + | Host | Port | Status | Leader count | Leader distribution | Partition distribution | Version | + | /\w+/ | /\d+/ | "ONLINE" | /\d+/ | /.*/ | /.*/ | /.*/ | When executing query: """ SHOW HOSTS diff --git a/tests/tck/steps/conftest.py b/tests/tck/steps/conftest.py index 16fca69f933..8122776bed3 100644 --- a/tests/tck/steps/conftest.py +++ b/tests/tck/steps/conftest.py @@ -8,7 +8,7 @@ then, ) -from nebula2.common.ttypes import Value, NullType +from nebula3.common.ttypes import Value, NullType from tests.tck.utils.nbv import register_function, parse # You could register functions that can be invoked from the parsing text diff --git a/tests/tck/utils/nbv.py b/tests/tck/utils/nbv.py index 75fbbbf4e45..ed694a0ee1f 100644 --- a/tests/tck/utils/nbv.py +++ b/tests/tck/utils/nbv.py @@ -12,7 +12,7 @@ else: from tests.tck.utils.mmh2 import mmh2 -from nebula2.common.ttypes import ( +from nebula3.common.ttypes import ( Value, NullType, NMap, diff --git a/tests/tck/utils/table.py b/tests/tck/utils/table.py index 3a8e0d1d2b7..7505c2a30ff 100644 --- a/tests/tck/utils/table.py +++ b/tests/tck/utils/table.py @@ -7,7 +7,7 @@ import re from tests.tck.utils.nbv import parse -from nebula2.common.ttypes import DataSet, Row, Value +from nebula3.common.ttypes import DataSet, Row, Value pattern = re.compile(r"^<\[(\w+)\]>$")