Skip to content

Commit

Permalink
Could report error even if there is no task
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Jul 19, 2022
1 parent 9ef861a commit fe932f5
Show file tree
Hide file tree
Showing 18 changed files with 101 additions and 82 deletions.
7 changes: 3 additions & 4 deletions src/meta/processors/job/BalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ BalanceJobExecutor::BalanceJobExecutor(GraphSpaceID space,
const std::vector<std::string>& paras)
: MetaJobExecutor(space, jobId, kvstore, adminClient, paras) {}

bool BalanceJobExecutor::check() {
return true;
nebula::cpp2::ErrorCode BalanceJobExecutor::check() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode BalanceJobExecutor::prepare() {
Expand Down Expand Up @@ -67,8 +67,7 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::recovery() {
plan_.reset(nullptr);
return recRet;
}
plan_->saveInStore();
return nebula::cpp2::ErrorCode::SUCCEEDED;
return plan_->saveInStore();
}

nebula::cpp2::ErrorCode BalanceJobExecutor::finish(bool) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/BalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class BalanceJobExecutor : public MetaJobExecutor {
*
* @return
*/
bool check() override;
nebula::cpp2::ErrorCode check() override;

/**
* @brief See implementation in child class
Expand Down
14 changes: 7 additions & 7 deletions src/meta/processors/job/DownloadJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ DownloadJobExecutor::DownloadJobExecutor(GraphSpaceID space,
helper_ = std::make_unique<nebula::hdfs::HdfsCommandHelper>();
}

bool DownloadJobExecutor::check() {
nebula::cpp2::ErrorCode DownloadJobExecutor::check() {
if (paras_.size() != 1) {
return false;
return nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

auto& url = paras_[0];
std::string hdfsPrefix = "hdfs://";
if (url.find(hdfsPrefix) != 0) {
LOG(ERROR) << "URL should start with " << hdfsPrefix;
return false;
return nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

auto u = url.substr(hdfsPrefix.size(), url.size());
Expand All @@ -44,20 +44,20 @@ bool DownloadJobExecutor::check() {
port_ = folly::to<int32_t>(tokens[1].toString().substr(0, position).c_str());
} catch (const std::exception& ex) {
LOG(ERROR) << "URL's port parse failed: " << url;
return false;
return nebula::cpp2::ErrorCode::E_INVALID_JOB;
}
path_ =
std::make_unique<std::string>(tokens[1].toString().substr(position, tokens[1].size()));
} else {
LOG(ERROR) << "URL Parse Failed: " << url;
return false;
return nebula::cpp2::ErrorCode::E_INVALID_JOB;
}
} else {
LOG(ERROR) << "URL Parse Failed: " << url;
return false;
return nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

return true;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode DownloadJobExecutor::prepare() {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/DownloadJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DownloadJobExecutor : public SimpleConcurrentJobExecutor {
AdminClient* adminClient,
const std::vector<std::string>& params);

bool check() override;
nebula::cpp2::ErrorCode check() override;

nebula::cpp2::ErrorCode prepare() override;

Expand Down
7 changes: 4 additions & 3 deletions src/meta/processors/job/IngestJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ IngestJobExecutor::IngestJobExecutor(GraphSpaceID space,
const std::vector<std::string>& paras)
: SimpleConcurrentJobExecutor(space, jobId, kvstore, adminClient, paras) {}

bool IngestJobExecutor::check() {
return paras_.empty();
nebula::cpp2::ErrorCode IngestJobExecutor::check() {
return paras_.empty() ? nebula::cpp2::ErrorCode::SUCCEEDED
: nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

nebula::cpp2::ErrorCode IngestJobExecutor::prepare() {
Expand All @@ -36,7 +37,7 @@ folly::Future<Status> IngestJobExecutor::executeInternal(HostAddr&& address,
taskId_++,
space_,
std::move(address),
taskParameters_,
{},
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
Expand Down
5 changes: 1 addition & 4 deletions src/meta/processors/job/IngestJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@ class IngestJobExecutor : public SimpleConcurrentJobExecutor {
AdminClient* adminClient,
const std::vector<std::string>& params);

bool check() override;
nebula::cpp2::ErrorCode check() override;

nebula::cpp2::ErrorCode prepare() override;

folly::Future<Status> executeInternal(HostAddr&& address,
std::vector<PartitionID>&& parts) override;

private:
std::vector<std::string> taskParameters_;
};

} // namespace meta
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/JobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class JobExecutor {
*
* @return
*/
virtual bool check() = 0;
virtual nebula::cpp2::ErrorCode check() = 0;

/**
* @brief Prepare the Job info from the arguments.
Expand Down
92 changes: 53 additions & 39 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,39 +156,41 @@ void JobManager::scheduleThread() {
jobDesc.getErrorCode());
save(jobKey, jobVal);
spaceRunningJobs_.insert_or_assign(spaceId, true);
if (!runJobInternal(jobDesc, jobOp)) {
jobFinished(spaceId, jodId, cpp2::JobStatus::FAILED);
auto code = runJobInternal(jobDesc, jobOp);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
jobFinished(spaceId, jodId, cpp2::JobStatus::FAILED, code);
}
}
}

bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
nebula::cpp2::ErrorCode JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
auto je = JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_);
JobExecutor* jobExec = je.get();

runningJobs_.emplace(jobDesc.getJobId(), std::move(je));
if (jobExec == nullptr) {
LOG(INFO) << "unreconized job type "
<< apache::thrift::util::enumNameSafe(jobDesc.getJobType());
return false;
return nebula::cpp2::ErrorCode::E_ADD_JOB_FAILURE;
}

if (jobDesc.getStatus() == cpp2::JobStatus::STOPPED) {
jobExec->stop();
return true;
}

if (!jobExec->check()) {
auto code = jobExec->check();
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Job Executor check failed";
return false;
return code;
}

if (jobExec->prepare() != nebula::cpp2::ErrorCode::SUCCEEDED) {
code = jobExec->prepare();
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Job Executor prepare failed";
return false;
return code;
}
if (op == JbOp::RECOVER) {
jobExec->recovery();
code = jobExec->recovery();
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Recover job failed";
return code;
}
}
if (jobExec->isMetaJob()) {
jobExec->setFinishCallBack([this, jobDesc](meta::cpp2::JobStatus status) {
Expand All @@ -206,11 +208,7 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
}
});
}
if (jobExec->execute() != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Job dispatch failed";
return false;
}
return true;
return jobExec->execute();
}

void JobManager::cleanJob(JobID jobId) {
Expand All @@ -226,14 +224,18 @@ void JobManager::cleanJob(JobID jobId) {
}
}

nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId,
JobID jobId,
cpp2::JobStatus jobStatus) {
nebula::cpp2::ErrorCode JobManager::jobFinished(
GraphSpaceID spaceId,
JobID jobId,
cpp2::JobStatus jobStatus,
std::optional<nebula::cpp2::ErrorCode> jobErrorCode) {
LOG(INFO) << folly::sformat("{}, spaceId={}, jobId={}, result={}",
__func__,
spaceId,
jobId,
apache::thrift::util::enumNameSafe(jobStatus));
DCHECK(jobStatus == cpp2::JobStatus::FINISHED || jobStatus == cpp2::JobStatus::FAILED ||
jobStatus == cpp2::JobStatus::STOPPED);
// normal job finish may race to job stop
auto mutexIter = muJobFinished_.find(spaceId);
if (mutexIter == muJobFinished_.end()) {
Expand All @@ -260,28 +262,36 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId,
return nebula::cpp2::ErrorCode::E_SAVE_JOB_FAILURE;
}

// Set the errorcode of the job
nebula::cpp2::ErrorCode jobErrCode = nebula::cpp2::ErrorCode::SUCCEEDED;
// If the job is marked as FAILED, one of the following will be triggered
// 1. If any of the task failed, set the errorcode of the job to the failed task code.
// 2. The job failed before running any task (e.g. in check or prepare), the error code of the job
// will be set as it
if (jobStatus == cpp2::JobStatus::FAILED) {
// Traverse the tasks and find the first task errorcode unsuccessful
auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId);
std::unique_ptr<kvstore::KVIterator> iter;
auto rc = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobKey, &iter);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
}
for (; iter->valid(); iter->next()) {
if (MetaKeyUtils::isJobKey(iter->key())) {
continue;
if (!jobErrorCode.has_value()) {
// Traverse the tasks and find the first task errorcode unsuccessful
auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId);
std::unique_ptr<kvstore::KVIterator> iter;
auto rc = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobKey, &iter);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
}
auto tupTaskVal = MetaKeyUtils::parseTaskVal(iter->val());
jobErrCode = std::get<4>(tupTaskVal);
if (jobErrCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
for (; iter->valid(); iter->next()) {
if (MetaKeyUtils::isJobKey(iter->key())) {
continue;
}
auto tupTaskVal = MetaKeyUtils::parseTaskVal(iter->val());
auto taskErrorCode = std::get<4>(tupTaskVal);
if (taskErrorCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
optJobDesc.setErrorCode(taskErrorCode);
break;
}
}
} else {
optJobDesc.setErrorCode(jobErrorCode.value());
}
} else if (jobStatus == cpp2::JobStatus::FINISHED) {
optJobDesc.setErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED);
}
optJobDesc.setErrorCode(jobErrCode);

spaceRunningJobs_.insert_or_assign(spaceId, false);
auto jobKey = MetaKeyUtils::jobKey(optJobDesc.getSpace(), optJobDesc.getJobId());
Expand Down Expand Up @@ -359,6 +369,9 @@ void JobManager::compareChangeStatus(JbmgrStatus expected, JbmgrStatus desired)
status_.compare_exchange_strong(ex, desired, std::memory_order_acq_rel);
}

// Only the job which execute on storaged will trigger this function. Storage will report to meta
// when the task has been executed. In other words, when storage report the task state, it should be
// one of FINISHED, FAILED or STOPPED.
nebula::cpp2::ErrorCode JobManager::reportTaskFinish(const cpp2::ReportTaskReq& req) {
auto spaceId = req.get_space_id();
auto jobId = req.get_job_id();
Expand Down Expand Up @@ -395,6 +408,7 @@ nebula::cpp2::ErrorCode JobManager::reportTaskFinish(const cpp2::ReportTaskReq&
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

// the status of task will be set as eithor FINISHED or FAILED in saveTaskStatus
auto rc = saveTaskStatus(*task, req);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
Expand Down
14 changes: 10 additions & 4 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,16 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
*
* @param spaceId
* @param jobId
* @param jobStatus
* @param jobStatus Will be one of the FINISHED, FAILED or STOPPED.
* @param jobErrorCode Will be specified when the job failed before any tasks executed, e.g. when
* check or prepare.
* @return cpp2::ErrorCode if error when write to kv store
*/
nebula::cpp2::ErrorCode jobFinished(GraphSpaceID spaceId, JobID jobId, cpp2::JobStatus jobStatus);
nebula::cpp2::ErrorCode jobFinished(GraphSpaceID spaceId,
JobID jobId,
cpp2::JobStatus jobStatus,
std::optional<nebula::cpp2::ErrorCode> jobErrorCode =
std::optional<nebula::cpp2::ErrorCode>());

/**
* @brief Report task finished.
Expand Down Expand Up @@ -228,9 +234,9 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
*
* @param jobDesc
* @param op
* @return true if all task dispatched, else false.
* @return error code
*/
bool runJobInternal(const JobDescription& jobDesc, JbOp op);
nebula::cpp2::ErrorCode runJobInternal(const JobDescription& jobDesc, JbOp op);

ErrorOr<nebula::cpp2::ErrorCode, GraphSpaceID> getSpaceId(const std::string& name);

Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/job/MetaJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ DECLARE_uint32(expired_time_factor);

namespace nebula {
namespace meta {
bool MetaJobExecutor::check() {
return true;
nebula::cpp2::ErrorCode MetaJobExecutor::check() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

// Prepare the Job info from the arguments.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/MetaJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class MetaJobExecutor : public JobExecutor {
*
* @return
*/
bool check() override;
nebula::cpp2::ErrorCode check() override;

/**
* @brief Prepare the Job info from the arguments.
Expand Down
5 changes: 3 additions & 2 deletions src/meta/processors/job/SimpleConcurrentJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ SimpleConcurrentJobExecutor::SimpleConcurrentJobExecutor(GraphSpaceID space,
const std::vector<std::string>& paras)
: StorageJobExecutor(space, jobId, kvstore, adminClient, paras) {}

bool SimpleConcurrentJobExecutor::check() {
return paras_.empty();
nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::check() {
return paras_.empty() ? nebula::cpp2::ErrorCode::SUCCEEDED
: nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/SimpleConcurrentJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SimpleConcurrentJobExecutor : public StorageJobExecutor {
AdminClient* adminClient,
const std::vector<std::string>& params);

bool check() override;
nebula::cpp2::ErrorCode check() override;

nebula::cpp2::ErrorCode prepare() override;

Expand Down
5 changes: 3 additions & 2 deletions src/meta/processors/job/StatsJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
namespace nebula {
namespace meta {

bool StatsJobExecutor::check() {
return paras_.empty();
nebula::cpp2::ErrorCode StatsJobExecutor::check() {
return paras_.empty() ? nebula::cpp2::ErrorCode::SUCCEEDED
: nebula::cpp2::ErrorCode::E_INVALID_JOB;
}

nebula::cpp2::ErrorCode StatsJobExecutor::save(const std::string& key, const std::string& val) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/StatsJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class StatsJobExecutor : public StorageJobExecutor {
toHost_ = TargetHosts::LEADER;
}

bool check() override;
nebula::cpp2::ErrorCode check() override;

nebula::cpp2::ErrorCode prepare() override;

Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/job/StorageJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class StorageJobExecutor : public JobExecutor {
*
* @return
*/
bool check() override {
return true;
nebula::cpp2::ErrorCode check() override {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

/**
Expand Down
Loading

0 comments on commit fe932f5

Please sign in to comment.