Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor job manager part4 #4067

Merged
merged 2 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,8 @@ std::string MetaKeyUtils::jobVal(const meta::cpp2::JobType& type,
std::vector<std::string> paras,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime) {
int64_t stopTime,
nebula::cpp2::ErrorCode errCode) {
std::string val;
val.reserve(256);
val.append(reinterpret_cast<const char*>(&type), sizeof(meta::cpp2::JobType));
Expand All @@ -1412,11 +1413,17 @@ std::string MetaKeyUtils::jobVal(const meta::cpp2::JobType& type,
}
val.append(reinterpret_cast<const char*>(&jobStatus), sizeof(meta::cpp2::JobStatus))
.append(reinterpret_cast<const char*>(&startTime), sizeof(int64_t))
.append(reinterpret_cast<const char*>(&stopTime), sizeof(int64_t));
.append(reinterpret_cast<const char*>(&stopTime), sizeof(int64_t))
.append(reinterpret_cast<const char*>(&errCode), sizeof(nebula::cpp2::ErrorCode));
return val;
}

std::tuple<meta::cpp2::JobType, std::vector<std::string>, meta::cpp2::JobStatus, int64_t, int64_t>
std::tuple<meta::cpp2::JobType,
std::vector<std::string>,
meta::cpp2::JobStatus,
int64_t,
int64_t,
nebula::cpp2::ErrorCode>
MetaKeyUtils::parseJobVal(folly::StringPiece rawVal) {
CHECK_GE(rawVal.size(),
sizeof(meta::cpp2::JobType) + sizeof(size_t) + sizeof(meta::cpp2::JobStatus) +
Expand All @@ -1439,7 +1446,9 @@ MetaKeyUtils::parseJobVal(folly::StringPiece rawVal) {
auto tStart = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
offset += sizeof(int64_t);
auto tStop = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
return std::make_tuple(type, paras, status, tStart, tStop);
offset += sizeof(int64_t);
auto errCode = *reinterpret_cast<const nebula::cpp2::ErrorCode*>(rawVal.data() + offset);
return std::make_tuple(type, paras, status, tStart, tStop, errCode);
}

std::pair<GraphSpaceID, JobID> MetaKeyUtils::parseJobKey(folly::StringPiece key) {
Expand Down Expand Up @@ -1473,20 +1482,23 @@ std::tuple<GraphSpaceID, JobID, TaskID> MetaKeyUtils::parseTaskKey(folly::String
std::string MetaKeyUtils::taskVal(HostAddr host,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime) {
int64_t stopTime,
nebula::cpp2::ErrorCode errCode) {
std::string val;
val.reserve(128);
val.append(MetaKeyUtils::serializeHostAddr(host))
.append(reinterpret_cast<const char*>(&jobStatus), sizeof(meta::cpp2::JobStatus))
.append(reinterpret_cast<const char*>(&startTime), sizeof(int64_t))
.append(reinterpret_cast<const char*>(&stopTime), sizeof(int64_t));
.append(reinterpret_cast<const char*>(&stopTime), sizeof(int64_t))
.append(reinterpret_cast<const char*>(&errCode), sizeof(nebula::cpp2::ErrorCode));
return val;
}

std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t> MetaKeyUtils::parseTaskVal(
folly::StringPiece rawVal) {
std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t, nebula::cpp2::ErrorCode>
MetaKeyUtils::parseTaskVal(folly::StringPiece rawVal) {
CHECK_GE(rawVal.size(),
sizeof(size_t) + sizeof(Port) + sizeof(meta::cpp2::JobStatus) + sizeof(int64_t) * 2);
sizeof(size_t) + sizeof(Port) + sizeof(meta::cpp2::JobStatus) + sizeof(int64_t) * 2 +
sizeof(nebula::cpp2::ErrorCode));
size_t offset = 0;
HostAddr host = MetaKeyUtils::deserializeHostAddr(rawVal);
offset += sizeof(size_t);
Expand All @@ -1498,7 +1510,9 @@ std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t> MetaKeyUtils::pars
auto tStart = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
offset += sizeof(int64_t);
auto tStop = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
return std::make_tuple(host, status, tStart, tStop);
offset += sizeof(int64_t);
auto errCode = *reinterpret_cast<const nebula::cpp2::ErrorCode*>(rawVal.data() + offset);
return std::make_tuple(host, status, tStart, tStop, errCode);
}

} // namespace nebula
24 changes: 15 additions & 9 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,14 +442,19 @@ class MetaKeyUtils final {
std::vector<std::string> paras,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime);
int64_t stopTime,
nebula::cpp2::ErrorCode errCode);
/**
* @brief Decode val from kvstore, return
* {jobType, paras, status, start time, stop time}
* {jobType, paras, status, start time, stop time, error code}
*/
static std::
tuple<meta::cpp2::JobType, std::vector<std::string>, meta::cpp2::JobStatus, int64_t, int64_t>
parseJobVal(folly::StringPiece rawVal);
static std::tuple<meta::cpp2::JobType,
std::vector<std::string>,
meta::cpp2::JobStatus,
int64_t,
int64_t,
nebula::cpp2::ErrorCode>
parseJobVal(folly::StringPiece rawVal);

static std::pair<GraphSpaceID, JobID> parseJobKey(folly::StringPiece key);

Expand All @@ -464,14 +469,15 @@ class MetaKeyUtils final {
static std::string taskVal(HostAddr host,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime);
int64_t stopTime,
nebula::cpp2::ErrorCode errCode);

/**
* @brief Decode task val,it should be
* {host, status, start time, stop time}
* {host, status, start time, stop time, error code}
*/
static std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t> parseTaskVal(
folly::StringPiece rawVal);
static std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t, nebula::cpp2::ErrorCode>
parseTaskVal(folly::StringPiece rawVal);
};

} // namespace nebula
Expand Down
19 changes: 14 additions & 5 deletions src/graph/executor/admin/SubmitJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,12 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
const nebula::meta::cpp2::JobDesc &jd, const std::vector<nebula::meta::cpp2::TaskDesc> &td) {
if (jd.get_type() == meta::cpp2::JobType::DATA_BALANCE ||
jd.get_type() == meta::cpp2::JobType::ZONE_BALANCE) {
nebula::DataSet v(
{"Job Id(spaceId:partId)", "Command(src->dst)", "Status", "Start Time", "Stop Time"});
nebula::DataSet v({"Job Id(spaceId:partId)",
"Command(src->dst)",
"Status",
"Start Time",
"Stop Time",
"Error Code"});
const auto &paras = jd.get_paras();
size_t index = std::stoul(paras.back());
uint32_t total = paras.size() - index - 1, succeeded = 0, failed = 0, inProgress = 0,
Expand All @@ -122,7 +126,8 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
apache::thrift::util::enumNameSafe(jd.get_type()),
apache::thrift::util::enumNameSafe(jd.get_status()),
convertJobTimestampToDateTime(jd.get_start_time()).toString(),
convertJobTimestampToDateTime(jd.get_stop_time()).toString()}));
convertJobTimestampToDateTime(jd.get_stop_time()).toString(),
apache::thrift::util::enumNameSafe(jd.get_code())}));
for (size_t i = index; i < paras.size() - 1; i++) {
meta::cpp2::BalanceTask tsk;
apache::thrift::CompactSerializer::deserialize(paras[i], tsk);
Expand All @@ -144,7 +149,8 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
std::move(tsk).get_command(),
apache::thrift::util::enumNameSafe(tsk.get_result()),
convertJobTimestampToDateTime(std::move(tsk).get_start_time()),
convertJobTimestampToDateTime(std::move(tsk).get_stop_time())}));
convertJobTimestampToDateTime(std::move(tsk).get_stop_time()),
apache::thrift::util::enumNameSafe(jd.get_code())}));
}
v.emplace_back(Row({folly::sformat("Total:{}", total),
folly::sformat("Succeeded:{}", succeeded),
Expand All @@ -153,13 +159,15 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
folly::sformat("Invalid:{}", invalid)}));
return v;
} else {
nebula::DataSet v({"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"});
nebula::DataSet v(
{"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time", "Error Code"});
v.emplace_back(nebula::Row({
jd.get_job_id(),
apache::thrift::util::enumNameSafe(jd.get_type()),
apache::thrift::util::enumNameSafe(jd.get_status()),
convertJobTimestampToDateTime(jd.get_start_time()),
convertJobTimestampToDateTime(jd.get_stop_time()),
apache::thrift::util::enumNameSafe(jd.get_code()),
}));
// tasks desc
for (const auto &taskDesc : td) {
Expand All @@ -169,6 +177,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
apache::thrift::util::enumNameSafe(taskDesc.get_status()),
convertJobTimestampToDateTime(taskDesc.get_start_time()),
convertJobTimestampToDateTime(taskDesc.get_stop_time()),
apache::thrift::util::enumNameSafe(taskDesc.get_code()),
}));
}
return v;
Expand Down
4 changes: 3 additions & 1 deletion src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ struct JobDesc {
5: JobStatus status,
6: i64 start_time,
7: i64 stop_time,
8: common.ErrorCode code,
}

struct TaskDesc {
Expand All @@ -277,6 +278,7 @@ struct TaskDesc {
5: JobStatus status,
6: i64 start_time,
7: i64 stop_time,
8: common.ErrorCode code,
}

struct AdminJobResult {
Expand Down Expand Up @@ -517,7 +519,7 @@ struct GetPartsAllocResp {

// get workerid for snowflake
struct GetWorkerIdReq {
1: binary host,
1: binary host,
}

struct GetWorkerIdResp {
Expand Down
18 changes: 14 additions & 4 deletions src/meta/processors/job/JobDescription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ JobDescription::JobDescription(GraphSpaceID space,
std::vector<std::string> paras,
Status status,
int64_t startTime,
int64_t stopTime)
int64_t stopTime,
nebula::cpp2::ErrorCode errCode)
: space_(space),
jobId_(jobId),
type_(type),
paras_(std::move(paras)),
status_(status),
startTime_(startTime),
stopTime_(stopTime) {}
stopTime_(stopTime),
errCode_(errCode) {}

ErrorOr<nebula::cpp2::ErrorCode, JobDescription> JobDescription::makeJobDescription(
folly::StringPiece rawkey, folly::StringPiece rawval) {
Expand All @@ -52,8 +54,15 @@ ErrorOr<nebula::cpp2::ErrorCode, JobDescription> JobDescription::makeJobDescript
auto status = std::get<2>(tup);
auto startTime = std::get<3>(tup);
auto stopTime = std::get<4>(tup);
return JobDescription(
spaceIdAndJob.first, spaceIdAndJob.second, type, paras, status, startTime, stopTime);
auto errCode = std::get<5>(tup);
return JobDescription(spaceIdAndJob.first,
spaceIdAndJob.second,
type,
paras,
status,
startTime,
stopTime,
errCode);
} catch (std::exception& ex) {
LOG(INFO) << ex.what();
}
Expand All @@ -69,6 +78,7 @@ cpp2::JobDesc JobDescription::toJobDesc() {
ret.status_ref() = status_;
ret.start_time_ref() = startTime_;
ret.stop_time_ref() = stopTime_;
ret.code_ref() = errCode_;
return ret;
}

Expand Down
12 changes: 11 additions & 1 deletion src/meta/processors/job/JobDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class JobDescription {
std::vector<std::string> paras = {},
Status status = Status::QUEUE,
int64_t startTime = 0,
int64_t stopTime = 0);
int64_t stopTime = 0,
nebula::cpp2::ErrorCode errCode = nebula::cpp2::ErrorCode::E_UNKNOWN);

/**
* @brief Return the JobDescription if both key & val is valid
Expand Down Expand Up @@ -119,6 +120,14 @@ class JobDescription {
return stopTime_;
}

void setErrorCode(nebula::cpp2::ErrorCode errCode) {
errCode_ = errCode;
}

nebula::cpp2::ErrorCode getErrorCode() {
return errCode_;
}

/**
* @brief
* Get a existed job from kvstore, return folly::none if there isn't
Expand Down Expand Up @@ -167,6 +176,7 @@ class JobDescription {
Status status_;
int64_t startTime_;
int64_t stopTime_;
nebula::cpp2::ErrorCode errCode_{nebula::cpp2::ErrorCode::E_UNKNOWN};
};

} // namespace meta
Expand Down
45 changes: 38 additions & 7 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() {
for (auto& jd : jds) {
jd.setStatus(cpp2::JobStatus::QUEUE, true);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(
jd.getJobType(), jd.getParas(), jd.getStatus(), jd.getStartTime(), jd.getStopTime());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
jd.getStatus(),
jd.getStartTime(),
jd.getStopTime(),
jd.getErrorCode());
save(jobKey, jobVal);
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -143,7 +147,8 @@ void JobManager::scheduleThread() {
jobDesc.getParas(),
jobDesc.getStatus(),
jobDesc.getStartTime(),
jobDesc.getStopTime());
jobDesc.getStopTime(),
jobDesc.getErrorCode());
save(jobKey, jobVal);
spaceRunningJobs_.insert_or_assign(spaceId, true);
if (!runJobInternal(jobDesc, jobOp)) {
Expand Down Expand Up @@ -242,13 +247,37 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId,
return nebula::cpp2::ErrorCode::E_SAVE_JOB_FAILURE;
}

// Set the errorcode of the job
nebula::cpp2::ErrorCode jobErrCode = nebula::cpp2::ErrorCode::SUCCEEDED;
if (jobStatus != cpp2::JobStatus::FINISHED) {
// Traverse the tasks and find the first task errorcode unsuccessful
auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId);
std::unique_ptr<kvstore::KVIterator> iter;
auto rc = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobKey, &iter);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
}
for (; iter->valid(); iter->next()) {
if (MetaKeyUtils::isJobKey(iter->key())) {
continue;
}
auto tupTaskVal = MetaKeyUtils::parseTaskVal(iter->val());
jobErrCode = std::get<4>(tupTaskVal);
if (jobErrCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
}
}
}
optJobDesc.setErrorCode(jobErrCode);

spaceRunningJobs_.insert_or_assign(spaceId, false);
auto jobKey = MetaKeyUtils::jobKey(optJobDesc.getSpace(), optJobDesc.getJobId());
auto jobVal = MetaKeyUtils::jobVal(optJobDesc.getJobType(),
optJobDesc.getParas(),
optJobDesc.getStatus(),
optJobDesc.getStartTime(),
optJobDesc.getStopTime());
optJobDesc.getStopTime(),
optJobDesc.getErrorCode());
auto rc = save(jobKey, jobVal);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
Expand Down Expand Up @@ -281,6 +310,7 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td,
auto status = code == nebula::cpp2::ErrorCode::SUCCEEDED ? cpp2::JobStatus::FINISHED
: cpp2::JobStatus::FAILED;
td.setStatus(status);
td.setErrorCode(code);

auto spaceId = req.get_space_id();
auto jobId = req.get_job_id();
Expand All @@ -301,8 +331,8 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td,
}

auto taskKey = MetaKeyUtils::taskKey(td.getSpace(), td.getJobId(), td.getTaskId());
auto taskVal =
MetaKeyUtils::taskVal(td.getHost(), td.getStatus(), td.getStartTime(), td.getStopTime());
auto taskVal = MetaKeyUtils::taskVal(
td.getHost(), td.getStatus(), td.getStartTime(), td.getStopTime(), td.getErrorCode());
auto rcSave = save(taskKey, taskVal);
if (rcSave != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rcSave;
Expand Down Expand Up @@ -393,7 +423,8 @@ nebula::cpp2::ErrorCode JobManager::addJob(JobDescription& jobDesc, AdminClient*
jobDesc.getParas(),
jobDesc.getStatus(),
jobDesc.getStartTime(),
jobDesc.getStopTime());
jobDesc.getStopTime(),
jobDesc.getErrorCode());
auto rc = save(jobKey, jobVal);
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
enqueue(spaceId, jobId, JbOp::ADD, jobDesc.getJobType());
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/RebuildJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace nebula {
namespace meta {

nebula::cpp2::ErrorCode RebuildJobExecutor::prepare() {
// The last value of paras_ are index name
// The value of paras_ are index name
auto spaceRet = spaceExist();
if (spaceRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Can't find the space, spaceId " << space_;
Expand Down
Loading