Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Support kill multi queries.
Browse files Browse the repository at this point in the history
  • Loading branch information
CPWstatic committed Jun 28, 2021
1 parent dfd2050 commit 1e951cd
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions src/meta/processors/sessionMan/SessionManagerProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,34 +179,38 @@ void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) {

void KillQueryProcessor::process(const cpp2::KillQueryReq& req) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::sessionLock());
auto sessionId = req.get_session_id();
auto epId = req.get_ep_id();
auto& killQueries = req.get_kill_queries();

auto sessionKey = MetaServiceUtils::sessionKey(sessionId);
auto ret = doGet(sessionKey);
if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
LOG(ERROR) << "Session id `" << sessionId << "' not found";
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
errCode = nebula::cpp2::ErrorCode::E_SESSION_NOT_FOUND;
std::vector<kvstore::KV> data;
for (auto& kv : killQueries) {
auto sessionId = kv.first;
auto sessionKey = MetaServiceUtils::sessionKey(sessionId);
auto ret = doGet(sessionKey);
if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
LOG(ERROR) << "Session id `" << sessionId << "' not found";
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
errCode = nebula::cpp2::ErrorCode::E_SESSION_NOT_FOUND;
}
handleErrorCode(errCode);
onFinished();
return;
}
handleErrorCode(errCode);
onFinished();
return;
}

auto session = MetaServiceUtils::parseSessionVal(nebula::value(ret));
auto query = session.queries_ref()->find(epId);
if (query == session.queries_ref()->end()) {
handleErrorCode(nebula::cpp2::ErrorCode::E_QUERY_NOT_FOUND);
onFinished();
return;
}
auto session = MetaServiceUtils::parseSessionVal(nebula::value(ret));
for (auto& epId : kv.second) {
auto query = session.queries_ref()->find(epId);
if (query == session.queries_ref()->end()) {
handleErrorCode(nebula::cpp2::ErrorCode::E_QUERY_NOT_FOUND);
onFinished();
return;
}
query->second.set_status(cpp2::QueryStatus::KILLING);
}

query->second.set_status(cpp2::QueryStatus::KILLING);
std::vector<kvstore::KV> data;
data.emplace_back(MetaServiceUtils::sessionKey(sessionId),
MetaServiceUtils::sessionVal(session));
data.emplace_back(MetaServiceUtils::sessionKey(sessionId),
MetaServiceUtils::sessionVal(session));
}

auto putRet = doSyncPut(std::move(data));
if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down

0 comments on commit 1e951cd

Please sign in to comment.