Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Could report error even if there is no task #4444

Merged
merged 1 commit into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/meta/processors/job/BalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ BalanceJobExecutor::BalanceJobExecutor(GraphSpaceID space,
const std::vector<std::string>& paras)
: MetaJobExecutor(space, jobId, kvstore, adminClient, paras) {}

bool BalanceJobExecutor::check() {
return true;
nebula::cpp2::ErrorCode BalanceJobExecutor::check() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode BalanceJobExecutor::prepare() {
Expand Down Expand Up @@ -67,8 +67,7 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::recovery() {
plan_.reset(nullptr);
return recRet;
}
plan_->saveInStore();
return nebula::cpp2::ErrorCode::SUCCEEDED;
return plan_->saveInStore();
}

nebula::cpp2::ErrorCode BalanceJobExecutor::finish(bool) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/BalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class BalanceJobExecutor : public MetaJobExecutor {
*
* @return
*/
bool check() override;
nebula::cpp2::ErrorCode check() override;

/**
* @brief See implementation in child class
Expand Down
14 changes: 7 additions & 7 deletions src/meta/processors/job/DownloadJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ DownloadJobExecutor::DownloadJobExecutor(GraphSpaceID space,
helper_ = std::make_unique<nebula::hdfs::HdfsCommandHelper>();
}

bool DownloadJobExecutor::check() {
nebula::cpp2::ErrorCode DownloadJobExecutor::check() {
if (paras_.size() != 1) {
return false;
return nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

auto& url = paras_[0];
std::string hdfsPrefix = "hdfs://";
if (url.find(hdfsPrefix) != 0) {
LOG(ERROR) << "URL should start with " << hdfsPrefix;
return false;
return nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

auto u = url.substr(hdfsPrefix.size(), url.size());
Expand All @@ -44,20 +44,20 @@ bool DownloadJobExecutor::check() {
port_ = folly::to<int32_t>(tokens[1].toString().substr(0, position).c_str());
} catch (const std::exception& ex) {
LOG(ERROR) << "URL's port parse failed: " << url;
return false;
return nebula::cpp2::ErrorCode::E_INVALID_JOB;
}
path_ =
std::make_unique<std::string>(tokens[1].toString().substr(position, tokens[1].size()));
} else {
LOG(ERROR) << "URL Parse Failed: " << url;
return false;
return nebula::cpp2::ErrorCode::E_INVALID_JOB;
}
} else {
LOG(ERROR) << "URL Parse Failed: " << url;
return false;
return nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

return true;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode DownloadJobExecutor::prepare() {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/DownloadJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DownloadJobExecutor : public SimpleConcurrentJobExecutor {
AdminClient* adminClient,
const std::vector<std::string>& params);

bool check() override;
nebula::cpp2::ErrorCode check() override;

nebula::cpp2::ErrorCode prepare() override;

Expand Down
7 changes: 4 additions & 3 deletions src/meta/processors/job/IngestJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ IngestJobExecutor::IngestJobExecutor(GraphSpaceID space,
const std::vector<std::string>& paras)
: SimpleConcurrentJobExecutor(space, jobId, kvstore, adminClient, paras) {}

bool IngestJobExecutor::check() {
return paras_.empty();
nebula::cpp2::ErrorCode IngestJobExecutor::check() {
return paras_.empty() ? nebula::cpp2::ErrorCode::SUCCEEDED
: nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

nebula::cpp2::ErrorCode IngestJobExecutor::prepare() {
Expand All @@ -36,7 +37,7 @@ folly::Future<Status> IngestJobExecutor::executeInternal(HostAddr&& address,
taskId_++,
space_,
std::move(address),
taskParameters_,
{},
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
Expand Down
5 changes: 1 addition & 4 deletions src/meta/processors/job/IngestJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@ class IngestJobExecutor : public SimpleConcurrentJobExecutor {
AdminClient* adminClient,
const std::vector<std::string>& params);

bool check() override;
nebula::cpp2::ErrorCode check() override;

nebula::cpp2::ErrorCode prepare() override;

folly::Future<Status> executeInternal(HostAddr&& address,
std::vector<PartitionID>&& parts) override;

private:
std::vector<std::string> taskParameters_;
};

} // namespace meta
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/JobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class JobExecutor {
*
* @return
*/
virtual bool check() = 0;
virtual nebula::cpp2::ErrorCode check() = 0;

/**
* @brief Prepare the Job info from the arguments.
Expand Down
92 changes: 53 additions & 39 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,39 +156,41 @@ void JobManager::scheduleThread() {
jobDesc.getErrorCode());
save(jobKey, jobVal);
spaceRunningJobs_.insert_or_assign(spaceId, true);
if (!runJobInternal(jobDesc, jobOp)) {
jobFinished(spaceId, jodId, cpp2::JobStatus::FAILED);
auto code = runJobInternal(jobDesc, jobOp);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
jobFinished(spaceId, jodId, cpp2::JobStatus::FAILED, code);
}
}
}

bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
nebula::cpp2::ErrorCode JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
auto je = JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_);
JobExecutor* jobExec = je.get();

runningJobs_.emplace(jobDesc.getJobId(), std::move(je));
if (jobExec == nullptr) {
LOG(INFO) << "unreconized job type "
<< apache::thrift::util::enumNameSafe(jobDesc.getJobType());
return false;
return nebula::cpp2::ErrorCode::E_ADD_JOB_FAILURE;
}

if (jobDesc.getStatus() == cpp2::JobStatus::STOPPED) {
jobExec->stop();
return true;
}

if (!jobExec->check()) {
auto code = jobExec->check();
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Job Executor check failed";
return false;
return code;
}

if (jobExec->prepare() != nebula::cpp2::ErrorCode::SUCCEEDED) {
code = jobExec->prepare();
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Job Executor prepare failed";
return false;
return code;
}
if (op == JbOp::RECOVER) {
jobExec->recovery();
code = jobExec->recovery();
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Recover job failed";
return code;
}
}
if (jobExec->isMetaJob()) {
jobExec->setFinishCallBack([this, jobDesc](meta::cpp2::JobStatus status) {
Expand All @@ -206,11 +208,7 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
}
});
}
if (jobExec->execute() != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Job dispatch failed";
return false;
}
return true;
return jobExec->execute();
}

void JobManager::cleanJob(JobID jobId) {
Expand All @@ -226,14 +224,18 @@ void JobManager::cleanJob(JobID jobId) {
}
}

nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId,
JobID jobId,
cpp2::JobStatus jobStatus) {
nebula::cpp2::ErrorCode JobManager::jobFinished(
GraphSpaceID spaceId,
JobID jobId,
cpp2::JobStatus jobStatus,
std::optional<nebula::cpp2::ErrorCode> jobErrorCode) {
LOG(INFO) << folly::sformat("{}, spaceId={}, jobId={}, result={}",
__func__,
spaceId,
jobId,
apache::thrift::util::enumNameSafe(jobStatus));
DCHECK(jobStatus == cpp2::JobStatus::FINISHED || jobStatus == cpp2::JobStatus::FAILED ||
jobStatus == cpp2::JobStatus::STOPPED);
// normal job finish may race to job stop
auto mutexIter = muJobFinished_.find(spaceId);
if (mutexIter == muJobFinished_.end()) {
Expand All @@ -260,28 +262,36 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId,
return nebula::cpp2::ErrorCode::E_SAVE_JOB_FAILURE;
}

// Set the errorcode of the job
nebula::cpp2::ErrorCode jobErrCode = nebula::cpp2::ErrorCode::SUCCEEDED;
// If the job is marked as FAILED, one of the following will be triggered
// 1. If any of the task failed, set the errorcode of the job to the failed task code.
// 2. The job failed before running any task (e.g. in check or prepare), the error code of the job
// will be set as it
if (jobStatus == cpp2::JobStatus::FAILED) {
// Traverse the tasks and find the first task errorcode unsuccessful
auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId);
std::unique_ptr<kvstore::KVIterator> iter;
auto rc = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobKey, &iter);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
}
for (; iter->valid(); iter->next()) {
if (MetaKeyUtils::isJobKey(iter->key())) {
continue;
if (!jobErrorCode.has_value()) {
// Traverse the tasks and find the first task errorcode unsuccessful
auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId);
std::unique_ptr<kvstore::KVIterator> iter;
auto rc = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobKey, &iter);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
}
auto tupTaskVal = MetaKeyUtils::parseTaskVal(iter->val());
jobErrCode = std::get<4>(tupTaskVal);
if (jobErrCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
for (; iter->valid(); iter->next()) {
if (MetaKeyUtils::isJobKey(iter->key())) {
continue;
}
auto tupTaskVal = MetaKeyUtils::parseTaskVal(iter->val());
auto taskErrorCode = std::get<4>(tupTaskVal);
if (taskErrorCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
optJobDesc.setErrorCode(taskErrorCode);
break;
}
}
} else {
optJobDesc.setErrorCode(jobErrorCode.value());
}
} else if (jobStatus == cpp2::JobStatus::FINISHED) {
optJobDesc.setErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED);
}
optJobDesc.setErrorCode(jobErrCode);

spaceRunningJobs_.insert_or_assign(spaceId, false);
auto jobKey = MetaKeyUtils::jobKey(optJobDesc.getSpace(), optJobDesc.getJobId());
Expand Down Expand Up @@ -359,6 +369,9 @@ void JobManager::compareChangeStatus(JbmgrStatus expected, JbmgrStatus desired)
status_.compare_exchange_strong(ex, desired, std::memory_order_acq_rel);
}

// Only the job which execute on storaged will trigger this function. Storage will report to meta
// when the task has been executed. In other words, when storage report the task state, it should be
// one of FINISHED, FAILED or STOPPED.
nebula::cpp2::ErrorCode JobManager::reportTaskFinish(const cpp2::ReportTaskReq& req) {
auto spaceId = req.get_space_id();
auto jobId = req.get_job_id();
Expand Down Expand Up @@ -395,6 +408,7 @@ nebula::cpp2::ErrorCode JobManager::reportTaskFinish(const cpp2::ReportTaskReq&
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

// the status of task will be set as eithor FINISHED or FAILED in saveTaskStatus
auto rc = saveTaskStatus(*task, req);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
Expand Down
14 changes: 10 additions & 4 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,16 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
*
* @param spaceId
* @param jobId
* @param jobStatus
* @param jobStatus Will be one of the FINISHED, FAILED or STOPPED.
* @param jobErrorCode Will be specified when the job failed before any tasks executed, e.g. when
* check or prepare.
* @return cpp2::ErrorCode if error when write to kv store
*/
nebula::cpp2::ErrorCode jobFinished(GraphSpaceID spaceId, JobID jobId, cpp2::JobStatus jobStatus);
nebula::cpp2::ErrorCode jobFinished(GraphSpaceID spaceId,
JobID jobId,
cpp2::JobStatus jobStatus,
std::optional<nebula::cpp2::ErrorCode> jobErrorCode =
std::optional<nebula::cpp2::ErrorCode>());

/**
* @brief Report task finished.
Expand Down Expand Up @@ -228,9 +234,9 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
*
* @param jobDesc
* @param op
* @return true if all task dispatched, else false.
* @return error code
*/
bool runJobInternal(const JobDescription& jobDesc, JbOp op);
nebula::cpp2::ErrorCode runJobInternal(const JobDescription& jobDesc, JbOp op);

ErrorOr<nebula::cpp2::ErrorCode, GraphSpaceID> getSpaceId(const std::string& name);

Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/job/MetaJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ DECLARE_uint32(expired_time_factor);

namespace nebula {
namespace meta {
bool MetaJobExecutor::check() {
return true;
nebula::cpp2::ErrorCode MetaJobExecutor::check() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

// Prepare the Job info from the arguments.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/MetaJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class MetaJobExecutor : public JobExecutor {
*
* @return
*/
bool check() override;
nebula::cpp2::ErrorCode check() override;

/**
* @brief Prepare the Job info from the arguments.
Expand Down
5 changes: 3 additions & 2 deletions src/meta/processors/job/SimpleConcurrentJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ SimpleConcurrentJobExecutor::SimpleConcurrentJobExecutor(GraphSpaceID space,
const std::vector<std::string>& paras)
: StorageJobExecutor(space, jobId, kvstore, adminClient, paras) {}

bool SimpleConcurrentJobExecutor::check() {
return paras_.empty();
nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::check() {
return paras_.empty() ? nebula::cpp2::ErrorCode::SUCCEEDED
: nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/SimpleConcurrentJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SimpleConcurrentJobExecutor : public StorageJobExecutor {
AdminClient* adminClient,
const std::vector<std::string>& params);

bool check() override;
nebula::cpp2::ErrorCode check() override;

nebula::cpp2::ErrorCode prepare() override;

Expand Down
5 changes: 3 additions & 2 deletions src/meta/processors/job/StatsJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
namespace nebula {
namespace meta {

bool StatsJobExecutor::check() {
return paras_.empty();
nebula::cpp2::ErrorCode StatsJobExecutor::check() {
return paras_.empty() ? nebula::cpp2::ErrorCode::SUCCEEDED
: nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

nebula::cpp2::ErrorCode StatsJobExecutor::save(const std::string& key, const std::string& val) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/StatsJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class StatsJobExecutor : public StorageJobExecutor {
toHost_ = TargetHosts::LEADER;
}

bool check() override;
nebula::cpp2::ErrorCode check() override;

nebula::cpp2::ErrorCode prepare() override;

Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/job/StorageJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class StorageJobExecutor : public JobExecutor {
*
* @return
*/
bool check() override {
return true;
nebula::cpp2::ErrorCode check() override {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

/**
Expand Down
Loading