Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Slow query. (#495)
Browse files Browse the repository at this point in the history
* Return killed queries.

* Support kill query.

* Do not update session if the session in meta is newer.

* Return query desc when update sessions.

* Support kill multi queries.

* Address comment.
  • Loading branch information
CPWstatic committed Jul 1, 2021
1 parent 34e7841 commit da5dde0
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 5 deletions.
8 changes: 7 additions & 1 deletion src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ MetaServiceHandler::future_createSession(const cpp2::CreateSessionReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
folly::Future<cpp2::UpdateSessionsResp>
MetaServiceHandler::future_updateSessions(const cpp2::UpdateSessionsReq& req) {
auto* processor = UpdateSessionsProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
Expand All @@ -590,6 +590,12 @@ MetaServiceHandler::future_removeSession(const cpp2::RemoveSessionReq& req) {
auto* processor = RemoveSessionProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_killQuery(const cpp2::KillQueryReq& req) {
auto* processor = KillQueryProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}
} // namespace meta
} // namespace nebula

5 changes: 4 additions & 1 deletion src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::CreateSessionResp>
future_createSession(const cpp2::CreateSessionReq& req) override;

folly::Future<cpp2::ExecResp>
folly::Future<cpp2::UpdateSessionsResp>
future_updateSessions(const cpp2::UpdateSessionsReq& req) override;

folly::Future<cpp2::ListSessionsResp>
Expand All @@ -300,6 +300,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ExecResp>
future_removeSession(const cpp2::RemoveSessionReq& req) override;

folly::Future<cpp2::ExecResp>
future_killQuery(const cpp2::KillQueryReq& req) override;

private:
kvstore::KVStore* kvstore_ = nullptr;
ClusterID clusterId_{0};
Expand Down
79 changes: 78 additions & 1 deletion src/meta/processors/sessionMan/SessionManagerProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ void CreateSessionProcessor::process(const cpp2::CreateSessionReq& req) {
void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::sessionLock());
std::vector<kvstore::KV> data;
std::unordered_map<nebula::SessionID,
std::unordered_map<nebula::ExecutionPlanID, cpp2::QueryDesc>>
killedQueries;
for (auto& session : req.get_sessions()) {
auto sessionId = session.get_session_id();
auto sessionKey = MetaServiceUtils::sessionKey(sessionId);
Expand All @@ -62,16 +65,46 @@ void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) {
return;
}

// update sessions to be saved if query is being killed, and return them to client.
auto& newQueries = *session.queries_ref();
std::unordered_map<nebula::ExecutionPlanID, cpp2::QueryDesc> killedQueriesInCurrentSession;
auto sessionInMeta = MetaServiceUtils::parseSessionVal(nebula::value(ret));
for (const auto& savedQuery : sessionInMeta.get_queries()) {
auto epId = savedQuery.first;
auto newQuery = newQueries.find(epId);
if (newQuery == newQueries.end()) {
continue;
}
auto& desc = savedQuery.second;
if (desc.get_status() == cpp2::QueryStatus::KILLING) {
const_cast<cpp2::QueryDesc&>(newQuery->second)
.set_status(cpp2::QueryStatus::KILLING);
killedQueriesInCurrentSession.emplace(epId, desc);
}
}
if (!killedQueriesInCurrentSession.empty()) {
killedQueries[sessionId] = std::move(killedQueriesInCurrentSession);
}

if (sessionInMeta.get_update_time() > session.get_update_time()) {
continue;
}

data.emplace_back(MetaServiceUtils::sessionKey(sessionId),
MetaServiceUtils::sessionVal(session));
}

auto ret = doSyncPut(std::move(data));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Put data error on meta server, errorCode: "
<< apache::thrift::util::enumNameSafe(ret);
handleErrorCode(ret);
onFinished();
return;
}

handleErrorCode(ret);
resp_.set_killed_queries(std::move(killedQueries));
handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED);
onFinished();
}

Expand Down Expand Up @@ -145,5 +178,49 @@ void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) {
doRemove(sessionKey);
}

void KillQueryProcessor::process(const cpp2::KillQueryReq& req) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::sessionLock());
auto& killQueries = req.get_kill_queries();

std::vector<kvstore::KV> data;
for (auto& kv : killQueries) {
auto sessionId = kv.first;
auto sessionKey = MetaServiceUtils::sessionKey(sessionId);
auto ret = doGet(sessionKey);
if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
LOG(ERROR) << "Session id `" << sessionId << "' not found";
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
errCode = nebula::cpp2::ErrorCode::E_SESSION_NOT_FOUND;
}
handleErrorCode(errCode);
onFinished();
return;
}

auto session = MetaServiceUtils::parseSessionVal(nebula::value(ret));
for (auto& epId : kv.second) {
auto query = session.queries_ref()->find(epId);
if (query == session.queries_ref()->end()) {
handleErrorCode(nebula::cpp2::ErrorCode::E_QUERY_NOT_FOUND);
onFinished();
return;
}
query->second.set_status(cpp2::QueryStatus::KILLING);
}

data.emplace_back(MetaServiceUtils::sessionKey(sessionId),
MetaServiceUtils::sessionVal(session));
}

auto putRet = doSyncPut(std::move(data));
if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Put data error on meta server, errorCode: "
<< apache::thrift::util::enumNameSafe(putRet);
}
handleErrorCode(putRet);
onFinished();
}

} // namespace meta
} // namespace nebula
17 changes: 15 additions & 2 deletions src/meta/processors/sessionMan/SessionManagerProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class CreateSessionProcessor : public BaseProcessor<cpp2::CreateSessionResp> {
};


class UpdateSessionsProcessor : public BaseProcessor<cpp2::ExecResp> {
class UpdateSessionsProcessor : public BaseProcessor<cpp2::UpdateSessionsResp> {
public:
static UpdateSessionsProcessor* instance(kvstore::KVStore* kvstore) {
return new UpdateSessionsProcessor(kvstore);
Expand All @@ -37,7 +37,7 @@ class UpdateSessionsProcessor : public BaseProcessor<cpp2::ExecResp> {

private:
explicit UpdateSessionsProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::ExecResp>(kvstore) {}
: BaseProcessor<cpp2::UpdateSessionsResp>(kvstore) {}
};


Expand Down Expand Up @@ -80,6 +80,19 @@ class RemoveSessionProcessor : public BaseProcessor<cpp2::ExecResp> {
explicit RemoveSessionProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::ExecResp>(kvstore) {}
};

class KillQueryProcessor : public BaseProcessor<cpp2::ExecResp> {
public:
static KillQueryProcessor* instance(kvstore::KVStore* kvstore) {
return new KillQueryProcessor(kvstore);
}

void process(const cpp2::KillQueryReq& req);

private:
explicit KillQueryProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::ExecResp>(kvstore) {}
};
} // namespace meta
} // namespace nebula
#endif // META_SESSIONMANAGERPROCESSOR_H
43 changes: 43 additions & 0 deletions src/meta/test/ProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2600,6 +2600,7 @@ TEST(ProcessorTest, SessionManagerTest) {
TestUtils::createSomeHosts(kv.get());
TestUtils::assembleSpace(kv.get(), 1, 1);
SessionID sessionId = 0;
ExecutionPlanID epId = 1;
{
cpp2::CreateUserReq req;
req.set_if_not_exists(false);
Expand Down Expand Up @@ -2630,13 +2631,16 @@ TEST(ProcessorTest, SessionManagerTest) {
meta::cpp2::Session session;
session.set_session_id(sessionId);
session.set_space_name("test");
session.set_update_time(time::WallClock::fastNowInMicroSec());
session.queries_ref()->emplace(epId, cpp2::QueryDesc());
req.set_sessions({session});

auto* processor = UpdateSessionsProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
ASSERT_TRUE(resp.get_killed_queries().empty());
}
// list session
{
Expand All @@ -2647,6 +2651,8 @@ TEST(ProcessorTest, SessionManagerTest) {
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
ASSERT_EQ(resp.get_sessions().size(), 1);
ASSERT_EQ("test", resp.get_sessions()[0].get_space_name());
}
// get session
{
Expand All @@ -2660,6 +2666,43 @@ TEST(ProcessorTest, SessionManagerTest) {
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
ASSERT_EQ("test", resp.get_session().get_space_name());
}
// Kill query
{
cpp2::KillQueryReq killReq;
std::unordered_map<SessionID, std::unordered_set<ExecutionPlanID>> killedQueries;
std::unordered_set<ExecutionPlanID> eps = {epId};
killedQueries.emplace(sessionId, std::move(eps));
killReq.set_kill_queries(std::move(killedQueries));

auto* processor = KillQueryProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(killReq);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
// update session and get all killed queries
{
cpp2::UpdateSessionsReq req;
meta::cpp2::Session session;
session.set_session_id(sessionId);
session.set_space_name("test");
session.queries_ref()->emplace(epId, cpp2::QueryDesc());
req.set_sessions({session});

auto* processor = UpdateSessionsProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
auto& killedQueries = resp.get_killed_queries();
EXPECT_EQ(killedQueries.size(), 1);
for (auto& s : killedQueries) {
EXPECT_EQ(s.first, sessionId);
for (auto& q : s.second) {
EXPECT_EQ(q.first, epId);
}
}
}
// delete session
{
cpp2::RemoveSessionReq delReq;
Expand Down

0 comments on commit da5dde0

Please sign in to comment.