Skip to content

Commit

Permalink
Merge branch 'fix/extend-white-space-2' of github.com:Shylock-Hg/nebu…
Browse files Browse the repository at this point in the history
…la into fix/extend-white-space-2
  • Loading branch information
Shylock-Hg committed Jan 6, 2023
2 parents 46217de + 08cbc3e commit 238bbff
Show file tree
Hide file tree
Showing 24 changed files with 1,780 additions and 113 deletions.
4 changes: 4 additions & 0 deletions conf/nebula-graphd.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,7 @@

# if use balance data feature, only work if enable_experimental_feature is true
--enable_data_balance=true

########## session ##########
# Maximum number of sessions that can be created per IP and per user
--max_sessions_per_ip_per_user=300
9 changes: 9 additions & 0 deletions src/common/function/FunctionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,15 @@ FunctionManager::FunctionManager() {
case Value::Type::EDGE: {
return value.getEdge().id();
}
// The root cause is the edge-type data format of Traverse executor
case Value::Type::LIST: {
auto &edges = value.getList().values;
if (edges.size() == 1 && edges[0].isEdge()) {
return edges[0].getEdge().id();
} else {
return args[0];
}
}
default: {
// Join on the origin type
return args[0];
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/mutate/DeleteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ folly::Future<Status> DeleteEdgesExecutor::deleteEdges() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature && FLAGS_enable_toss;
param.useExperimentalFeature = false;
return qctx()
->getStorageClient()
->deleteEdges(param, std::move(edgeKeys))
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/mutate/InsertExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ folly::Future<Status> InsertEdgesExecutor::insertEdges() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
ieNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature && FLAGS_enable_toss;
param.useExperimentalFeature = false;
return qctx()
->getStorageClient()
->addEdges(param,
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/mutate/UpdateExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ folly::Future<Status> UpdateEdgeExecutor::execute() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
ueNode->getSpaceId(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature && FLAGS_enable_toss;
param.useExperimentalFeature = false;
return qctx()
->getStorageClient()
->updateEdge(param,
Expand Down
4 changes: 2 additions & 2 deletions src/graph/planner/match/SegmentsConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ SubPlan SegmentsConnector::rollUpApply(CypherClauseContextBase* ctx,
std::vector<Expression*> compareProps;
for (const auto& col : path.compareVariables) {
compareProps.emplace_back(FunctionCallExpression::make(
qctx->objPool(), "id", {InputPropertyExpression::make(qctx->objPool(), col)}));
qctx->objPool(), "_joinkey", {InputPropertyExpression::make(qctx->objPool(), col)}));
}
InputPropertyExpression* collectProp = InputPropertyExpression::make(qctx->objPool(), collectCol);
auto* rollUpApply = RollUpApply::make(
Expand All @@ -104,7 +104,7 @@ SubPlan SegmentsConnector::rollUpApply(CypherClauseContextBase* ctx,
std::vector<Expression*> keyProps;
for (const auto& col : path.compareVariables) {
keyProps.emplace_back(FunctionCallExpression::make(
qctx->objPool(), "id", {InputPropertyExpression::make(qctx->objPool(), col)}));
qctx->objPool(), "_joinkey", {InputPropertyExpression::make(qctx->objPool(), col)}));
}
auto* patternApply = PatternApply::make(
qctx, left.root, DCHECK_NOTNULL(right.root), std::move(keyProps), path.isAntiPred);
Expand Down
1 change: 0 additions & 1 deletion src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ DEFINE_bool(disable_octal_escape_char,
" in next version to ensure compatibility with cypher.");

DEFINE_bool(enable_experimental_feature, false, "Whether to enable experimental feature");
DEFINE_bool(enable_toss, false, "Whether to enable toss feature");
DEFINE_bool(enable_data_balance, true, "Whether to enable data balance feature");

DEFINE_int32(num_rows_to_check_memory, 1024, "number rows to check memory");
Expand Down
1 change: 0 additions & 1 deletion src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ DECLARE_bool(optimize_appendvertice);
DECLARE_int64(max_allowed_connections);

DECLARE_bool(enable_experimental_feature);
DECLARE_bool(enable_toss);
DECLARE_bool(enable_data_balance);

DECLARE_bool(enable_client_white_list);
Expand Down
19 changes: 15 additions & 4 deletions src/graph/validator/MatchValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,7 @@ Status MatchValidator::validatePathInWhere(
matchPath.alias()->c_str());
}
if (find->second != AliasType::kPath) {
return Status::SemanticError("Alias `%s' should be Path, but got type '%s",
return Status::SemanticError("`%s' is defined with type %s, but referenced with type Path",
matchPath.alias()->c_str(),
AliasTypeName::get(find->second).c_str());
}
Expand All @@ -1258,7 +1258,7 @@ Status MatchValidator::validatePathInWhere(
node->alias().c_str());
}
if (find->second != AliasType::kNode) {
return Status::SemanticError("Alias `%s' should be Node, but got type '%s",
return Status::SemanticError("`%s' is defined with type %s, but referenced with type Node",
node->alias().c_str(),
AliasTypeName::get(find->second).c_str());
}
Expand All @@ -1272,11 +1272,17 @@ Status MatchValidator::validatePathInWhere(
"PatternExpression are not allowed to introduce new variables: `%s'.",
edge->alias().c_str());
}
if (find->second != AliasType::kEdge) {
return Status::SemanticError("Alias `%s' should be Edge, but got type '%s'",
if (!edge->range() && find->second != AliasType::kEdge) {
return Status::SemanticError("`%s' is defined with type %s, but referenced with type Edge",
edge->alias().c_str(),
AliasTypeName::get(find->second).c_str());
}
if (edge->range() && find->second != AliasType::kEdgeList) {
return Status::SemanticError(
"`%s' is defined with type %s, but referenced with type EdgeList",
edge->alias().c_str(),
AliasTypeName::get(find->second).c_str());
}
}
}
return Status::OK();
Expand All @@ -1290,6 +1296,11 @@ Status MatchValidator::validatePathInWhere(
pathInfo.compareVariables.emplace_back(node->alias());
}
}
for (const auto &edge : path->edges()) {
if (edge->alias()[0] != '_') {
pathInfo.compareVariables.emplace_back(edge->alias());
}
}
pathInfo.collectVariable = *path->alias();
pathInfo.rollUpApply = true;
return Status::OK();
Expand Down
40 changes: 17 additions & 23 deletions src/graph/validator/MutateValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ Status InsertEdgesValidator::check() {
// Check validity of vertices data.
// Check edge key type, check properties value, fill to NewEdge structure.
Status InsertEdgesValidator::prepareEdges() {
auto size =
FLAGS_enable_experimental_feature && FLAGS_enable_toss ? rows_.size() : rows_.size() * 2;
auto size = rows_.size() * 2;
edges_.reserve(size);

size_t fieldNum = schema_->getNumFields();
Expand Down Expand Up @@ -297,7 +296,7 @@ Status InsertEdgesValidator::prepareEdges() {
edge.key_ref() = key;
edge.props_ref() = std::move(entirePropValues);
edges_.emplace_back(edge);
if (!(FLAGS_enable_experimental_feature && FLAGS_enable_toss)) {
{
// inbound
key.src_ref() = dstId;
key.dst_ref() = srcId;
Expand Down Expand Up @@ -892,26 +891,21 @@ Status UpdateEdgeValidator::toPlan() {
{},
condition_,
{});
if ((FLAGS_enable_experimental_feature && FLAGS_enable_toss)) {
root_ = outNode;
tail_ = root_;
} else {
auto *inNode = UpdateEdge::make(qctx_,
outNode,
spaceId_,
std::move(name_),
std::move(dstId_),
std::move(srcId_),
-edgeType_,
rank_,
insertable_,
std::move(updatedProps_),
std::move(returnProps_),
std::move(condition_),
std::move(yieldColNames_));
root_ = inNode;
tail_ = outNode;
}
auto *inNode = UpdateEdge::make(qctx_,
outNode,
spaceId_,
std::move(name_),
std::move(dstId_),
std::move(srcId_),
-edgeType_,
rank_,
insertable_,
std::move(updatedProps_),
std::move(returnProps_),
std::move(condition_),
std::move(yieldColNames_));
root_ = inNode;
tail_ = outNode;
return Status::OK();
}

Expand Down
14 changes: 7 additions & 7 deletions src/meta/processors/index/FTIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,6 @@ void DropFTIndexProcessor::process(const cpp2::DropFTIndexReq& req) {
return;
}

auto batchHolder = std::make_unique<kvstore::BatchHolder>();
batchHolder->remove(std::move(indexKey));
auto timeInMilliSec = time::WallClock::fastNowInMilliSec();
LastUpdateTimeMan::update(batchHolder.get(), timeInMilliSec);
auto batch = encodeBatchValue(std::move(batchHolder)->getBatch());
doBatchOperation(std::move(batch));

const auto& serviceKey = MetaKeyUtils::serviceKey(cpp2::ExternalServiceType::ELASTICSEARCH);
auto getRet = doGet(serviceKey);
if (!nebula::ok(getRet)) {
Expand Down Expand Up @@ -213,6 +206,13 @@ void DropFTIndexProcessor::process(const cpp2::DropFTIndexReq& req) {
onFinished();
return;
}

auto batchHolder = std::make_unique<kvstore::BatchHolder>();
batchHolder->remove(std::move(indexKey));
auto timeInMilliSec = time::WallClock::fastNowInMilliSec();
LastUpdateTimeMan::update(batchHolder.get(), timeInMilliSec);
auto batch = encodeBatchValue(std::move(batchHolder)->getBatch());
doBatchOperation(std::move(batch));
}

void ListFTIndexesProcessor::process(const cpp2::ListFTIndexesReq&) {
Expand Down
6 changes: 6 additions & 0 deletions src/meta/processors/job/JobDescription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <stdexcept>

#include "common/utils/MetaKeyUtils.h"
#include "interface/gen-cpp2/meta_types.h"
#include "kvstore/KVIterator.h"
#include "meta/processors/Common.h"

Expand Down Expand Up @@ -83,6 +84,11 @@ cpp2::JobDesc JobDescription::toJobDesc() {
}

bool JobDescription::setStatus(Status newStatus, bool force) {
if (JobStatus::notSetable(status_)) {
// no need to change time.
return status_ == newStatus;
}

if (JobStatus::laterThan(status_, newStatus) && !force) {
return false;
}
Expand Down
9 changes: 5 additions & 4 deletions src/meta/processors/job/JobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace meta {
class JobExecutor {
public:
JobExecutor() = default;
explicit JobExecutor(kvstore::KVStore* kv, GraphSpaceID space) : kvstore_(kv), space_(space) {}
JobExecutor(kvstore::KVStore* kv, GraphSpaceID space) : kvstore_(kv), space_(space) {}
virtual ~JobExecutor() = default;

/**
Expand Down Expand Up @@ -99,9 +99,10 @@ class JobExecutor {

class JobExecutorFactory {
public:
static std::unique_ptr<JobExecutor> createJobExecutor(const JobDescription& jd,
kvstore::KVStore* store,
AdminClient* client);
virtual ~JobExecutorFactory() = default;
virtual std::unique_ptr<JobExecutor> createJobExecutor(const JobDescription& jd,
kvstore::KVStore* store,
AdminClient* client);
};

} // namespace meta
Expand Down
31 changes: 21 additions & 10 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <thrift/lib/cpp/util/EnumUtils.h>

#include <boost/stacktrace.hpp>
#include <memory>

#include "common/stats/StatsManager.h"
#include "common/time/WallClock.h"
Expand Down Expand Up @@ -42,7 +43,10 @@ JobManager* JobManager::getInstance() {
return &inst;
}

bool JobManager::init(nebula::kvstore::KVStore* store, AdminClient* adminClient) {
bool JobManager::init(nebula::kvstore::KVStore* store,
AdminClient* adminClient,
std::shared_ptr<JobExecutorFactory> factory) {
executorFactory_ = factory;
adminClient_ = adminClient;
if (store == nullptr) {
return false;
Expand Down Expand Up @@ -97,7 +101,7 @@ nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() {
if (nebula::ok(optJobRet)) {
auto optJob = nebula::value(optJobRet);
std::unique_ptr<JobExecutor> je =
JobExecutorFactory::createJobExecutor(optJob, kvStore_, adminClient_);
executorFactory_->createJobExecutor(optJob, kvStore_, adminClient_);
// Only balance would change
if (optJob.getStatus() == cpp2::JobStatus::RUNNING && je->isMetaJob()) {
jds.emplace_back(std::move(optJob));
Expand Down Expand Up @@ -235,7 +239,7 @@ folly::Future<nebula::cpp2::ErrorCode> JobManager::runJobInternal(const JobDescr
iter = this->muJobFinished_.emplace(spaceId, std::make_unique<std::recursive_mutex>()).first;
}
std::lock_guard<std::recursive_mutex> lk(*(iter->second));
auto je = JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_);
auto je = executorFactory_->createJobExecutor(jobDesc, kvStore_, adminClient_);
jobExec = je.get();

runningJobs_.emplace(jobDesc.getJobId(), std::move(je));
Expand Down Expand Up @@ -440,7 +444,7 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td,
}

auto optJobDesc = nebula::value(optJobDescRet);
auto jobExec = JobExecutorFactory::createJobExecutor(optJobDesc, kvStore_, adminClient_);
auto jobExec = executorFactory_->createJobExecutor(optJobDesc, kvStore_, adminClient_);

if (!jobExec) {
LOG(INFO) << folly::sformat("createJobExecutor failed(), jobId={}", jobId);
Expand Down Expand Up @@ -682,6 +686,11 @@ bool JobManager::isExpiredJob(JobDescription& jobDesc) {
return false;
}
auto jobStart = jobDesc.getStartTime();
if (jobStart == 0) {
// should not happend, but just in case keep this job
LOG(INFO) << "Job " << jobDesc.getJobId() << " start time is not set, keep it for now";
return false;
}
auto duration = std::difftime(nebula::time::WallClock::fastNowInSec(), jobStart);
return duration > FLAGS_job_expired_secs;
}
Expand Down Expand Up @@ -848,7 +857,9 @@ ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
for (auto& [id, job] : allJobs) {
auto status = job.getStatus();
if (status == cpp2::JobStatus::FAILED || status == cpp2::JobStatus::STOPPED) {
jobsMaybeRecover.emplace(id);
if (!isExpiredJob(job)) {
jobsMaybeRecover.emplace(id);
}
}
}
std::set<JobID>::reverse_iterator lastBalaceJobRecoverIt = jobsMaybeRecover.rend();
Expand All @@ -869,7 +880,8 @@ ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
JobID jid;
bool jobExist = checkOnRunningJobExist(spaceId, job.getJobType(), job.getParas(), jid);
if (!jobExist) {
job.setStatus(cpp2::JobStatus::QUEUE, true);
job.setStatus(cpp2::JobStatus::QUEUE, true); // which cause the job execute again
job.setErrorCode(nebula::cpp2::ErrorCode::E_JOB_SUBMITTED);
auto jobKey = MetaKeyUtils::jobKey(job.getSpace(), jobId);
auto jobVal = MetaKeyUtils::jobVal(job.getJobType(),
job.getParas(),
Expand Down Expand Up @@ -919,9 +931,8 @@ ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
auto jobType = allJobs[*it].getJobType();
if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) {
if (jobIdSet.empty() || jobIdSet.count(*it)) {
LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt
<< " when there's a newer balance job " << *lastBalaceJobRecoverIt
<< " stopped or failed";
LOG(INFO) << "can't recover a balance job " << *it << " when there's a newer balance job "
<< *lastBalaceJobRecoverIt << " stopped or failed";
}
it = jobsMaybeRecover.erase(it);
} else {
Expand All @@ -931,7 +942,7 @@ ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
if (*lastBalaceJobRecoverIt < lastBalanceJobFinished) {
if (jobIdSet.empty() || jobIdSet.count(*lastBalaceJobRecoverIt)) {
LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt
<< " that before a finished balance job " << lastBalanceJobFinished;
<< " when there's a newer balance job " << lastBalanceJobFinished << " finished";
}
jobsMaybeRecover.erase(*lastBalaceJobRecoverIt);
}
Expand Down
Loading

0 comments on commit 238bbff

Please sign in to comment.