Skip to content

Commit

Permalink
set adminClient for job manager at the beginning
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Dec 23, 2022
1 parent 8e7aa68 commit 75e95ea
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 53 deletions.
15 changes: 7 additions & 8 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,6 @@ int main(int argc, char* argv[]) {
return EXIT_FAILURE;
}

{
nebula::meta::JobManager* jobMgr = nebula::meta::JobManager::getInstance();
if (!jobMgr->init(gKVStore.get())) {
LOG(ERROR) << "Init job manager failed";
return EXIT_FAILURE;
}
}

auto godInit = initGodUser(gKVStore.get(), localhost);
if (godInit != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Init god user failed";
Expand Down Expand Up @@ -199,6 +191,13 @@ int main(int argc, char* argv[]) {
auto handler =
std::make_shared<nebula::meta::MetaServiceHandler>(gKVStore.get(), metaClusterId());
LOG(INFO) << "The meta daemon start on " << localhost;
{
nebula::meta::JobManager* jobMgr = nebula::meta::JobManager::getInstance();
if (!jobMgr->init(gKVStore.get(), handler->getAdminClient())) {
LOG(ERROR) << "Init job manager failed";
return EXIT_FAILURE;
}
}
try {
metaServer->setPort(FLAGS_port);
metaServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection
Expand Down
4 changes: 4 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
namespace nebula {
namespace meta {

AdminClient* MetaServiceHandler::getAdminClient() {
return adminClient_.get();
}

folly::Future<cpp2::ExecResp> MetaServiceHandler::future_createSpace(
const cpp2::CreateSpaceReq& req) {
auto* processor = CreateSpaceProcessor::instance(kvstore_);
Expand Down
2 changes: 2 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
kAgentHBCounters.init();
}

AdminClient* getAdminClient();

/**
* Parts distribution related operations.
* */
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/job/AdminJobProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
for (size_t i = 0; i < paras.size(); i++) {
jobIds.push_back(std::stoi(paras[i]));
}
auto ret = jobMgr_->recoverJob(spaceId_, adminClient_, jobIds);
auto ret = jobMgr_->recoverJob(spaceId_, jobIds);
if (nebula::ok(ret)) {
result.recovered_job_num_ref() = nebula::value(ret);
} else {
Expand Down Expand Up @@ -145,7 +145,7 @@ nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq
}

JobDescription jobDesc(spaceId_, nebula::value(jobId), type, paras);
auto errorCode = jobMgr_->addJob(std::move(jobDesc), adminClient_);
auto errorCode = jobMgr_->addJob(std::move(jobDesc));
if (errorCode == nebula::cpp2::ErrorCode::SUCCEEDED) {
result.job_id_ref() = nebula::value(jobId);
}
Expand Down
13 changes: 8 additions & 5 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ JobManager* JobManager::getInstance() {
return &inst;
}

bool JobManager::init(nebula::kvstore::KVStore* store) {
bool JobManager::init(nebula::kvstore::KVStore* store, AdminClient* adminClient) {
adminClient_ = adminClient;
if (store == nullptr) {
return false;
}
Expand Down Expand Up @@ -542,7 +543,11 @@ ErrorOr<nebula::cpp2::ErrorCode, std::list<TaskDescription>> JobManager::getAllT
return taskDescriptions;
}

nebula::cpp2::ErrorCode JobManager::addJob(JobDescription jobDesc, AdminClient* client) {
void JobManager::setAdminClient(AdminClient* client) {
adminClient_ = client;
}

nebula::cpp2::ErrorCode JobManager::addJob(JobDescription jobDesc) {
auto mutexIter = muJobFinished_.find(jobDesc.getSpace());
if (mutexIter == muJobFinished_.end()) {
mutexIter =
Expand All @@ -569,7 +574,6 @@ nebula::cpp2::ErrorCode JobManager::addJob(JobDescription jobDesc, AdminClient*
}
return rc;
}
adminClient_ = client;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand Down Expand Up @@ -815,15 +819,14 @@ nebula::cpp2::ErrorCode JobManager::stopJob(GraphSpaceID spaceId, JobID jobId) {
}

ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
GraphSpaceID spaceId, AdminClient* client, const std::vector<int32_t>& jobIds) {
GraphSpaceID spaceId, const std::vector<int32_t>& jobIds) {
auto muIter = muJobFinished_.find(spaceId);
if (muIter == muJobFinished_.end()) {
muIter = muJobFinished_.emplace(spaceId, std::make_unique<std::recursive_mutex>()).first;
}
std::lock_guard<std::recursive_mutex> lk(*(muIter->second));
std::set<JobID> jobIdSet(jobIds.begin(), jobIds.end());
std::map<JobID, JobDescription> allJobs;
adminClient_ = client;
std::unique_ptr<kvstore::KVIterator> iter;
auto jobPre = MetaKeyUtils::jobPrefix(spaceId);
auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter);
Expand Down
10 changes: 5 additions & 5 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,26 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {

/**
* @brief Init task queue, kvStore and schedule thread
*
* @param store
* @param adminClient
* @return true if the init is successful
*/
bool init(nebula::kvstore::KVStore* store);
bool init(nebula::kvstore::KVStore* store, AdminClient* adminClient);

/**
* @brief Called when receive a system signal
*/
void shutDown();

void setAdminClient(AdminClient* client);

/**
* @brief Load job description from kvstore
*
* @param jobDesc
* @param client
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode addJob(JobDescription jobDesc, AdminClient* client);
nebula::cpp2::ErrorCode addJob(JobDescription jobDesc);

/**
* @brief The same job in inFlightJobs_.
Expand Down Expand Up @@ -157,7 +158,6 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
* @return Return error/recovered job num
*/
ErrorOr<nebula::cpp2::ErrorCode, uint32_t> recoverJob(GraphSpaceID spaceId,
AdminClient* client,
const std::vector<int32_t>& jobIds = {});

/**
Expand Down
2 changes: 1 addition & 1 deletion src/meta/test/CreateBackupProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ TEST(ProcessorTest, CreateBackupTest) {
std::vector<std::string> spaces = {"test_space"};
req.spaces_ref() = std::move(spaces);
JobManager* jobMgr = JobManager::getInstance();
ASSERT_TRUE(jobMgr->init(kv.get()));
ASSERT_TRUE(jobMgr->init(kv.get(), client.get()));
auto* processor = CreateBackupProcessor::instance(kv.get(), client.get());
auto f = processor->getFuture();
processor->process(req);
Expand Down
8 changes: 4 additions & 4 deletions src/meta/test/GetStatsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class GetStatsTest : public ::testing::Test {

jobMgr = JobManager::getInstance();
jobMgr->status_ = JobManager::JbmgrStatus::NOT_START;
jobMgr->init(kv_.get());
jobMgr->init(kv_.get(), nullptr);
}

void TearDown() override {
Expand Down Expand Up @@ -443,7 +443,7 @@ TEST_F(GetStatsTest, MockSingleMachineTest) {
// add stats job1
JobID jobId1 = 1;
JobDescription job1(spaceId, jobId1, cpp2::JobType::STATS);
jobMgr->addJob(job1, &adminClient);
jobMgr->addJob(job1);

JobCallBack cb1(jobMgr, spaceId, jobId1, 0, 100);
JobCallBack cb2(jobMgr, spaceId, 2, 0, 200);
Expand Down Expand Up @@ -491,7 +491,7 @@ TEST_F(GetStatsTest, MockSingleMachineTest) {
// add stats job2 of same space
JobID jobId2 = 2;
JobDescription job2(spaceId, jobId2, cpp2::JobType::STATS);
jobMgr->addJob(job2, &adminClient);
jobMgr->addJob(job2);

// check job result
{
Expand Down Expand Up @@ -560,7 +560,7 @@ TEST_F(GetStatsTest, MockMultiMachineTest) {
// add stats job
JobID jobId = 1;
JobDescription job(spaceId, jobId, cpp2::JobType::STATS);
jobMgr->addJob(job, &adminClient);
jobMgr->addJob(job);

JobCallBack cb1(jobMgr, spaceId, jobId, 0, 100);
JobCallBack cb2(jobMgr, spaceId, jobId, 1, 200);
Expand Down
Loading

0 comments on commit 75e95ea

Please sign in to comment.