From da5dde008f8d2d4de94bc987e9200cfe296b1e13 Mon Sep 17 00:00:00 2001 From: cpw <13495049+CPWstatic@users.noreply.github.com> Date: Thu, 1 Jul 2021 21:54:10 +0800 Subject: [PATCH] Slow query. (#495) * Return killed queries. * Support kill query. * Do not update session if the session in meta is newer. * Return query desc when update sessions. * Support kill multi queries. * Address comment. --- src/meta/MetaServiceHandler.cpp | 8 +- src/meta/MetaServiceHandler.h | 5 +- .../sessionMan/SessionManagerProcessor.cpp | 79 ++++++++++++++++++- .../sessionMan/SessionManagerProcessor.h | 17 +++- src/meta/test/ProcessorTest.cpp | 43 ++++++++++ 5 files changed, 147 insertions(+), 5 deletions(-) diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index f31170e04..e9242e057 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -567,7 +567,7 @@ MetaServiceHandler::future_createSession(const cpp2::CreateSessionReq& req) { RETURN_FUTURE(processor); } -folly::Future +folly::Future MetaServiceHandler::future_updateSessions(const cpp2::UpdateSessionsReq& req) { auto* processor = UpdateSessionsProcessor::instance(kvstore_); RETURN_FUTURE(processor); @@ -590,6 +590,12 @@ MetaServiceHandler::future_removeSession(const cpp2::RemoveSessionReq& req) { auto* processor = RemoveSessionProcessor::instance(kvstore_); RETURN_FUTURE(processor); } + +folly::Future +MetaServiceHandler::future_killQuery(const cpp2::KillQueryReq& req) { + auto* processor = KillQueryProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} } // namespace meta } // namespace nebula diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index 17b53bff7..ac33335dd 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -288,7 +288,7 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_createSession(const cpp2::CreateSessionReq& req) override; - folly::Future + folly::Future future_updateSessions(const cpp2::UpdateSessionsReq& req) override; folly::Future @@ -300,6 +300,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_removeSession(const cpp2::RemoveSessionReq& req) override; + folly::Future + future_killQuery(const cpp2::KillQueryReq& req) override; + private: kvstore::KVStore* kvstore_ = nullptr; ClusterID clusterId_{0}; diff --git a/src/meta/processors/sessionMan/SessionManagerProcessor.cpp b/src/meta/processors/sessionMan/SessionManagerProcessor.cpp index 846df9378..86ba35606 100644 --- a/src/meta/processors/sessionMan/SessionManagerProcessor.cpp +++ b/src/meta/processors/sessionMan/SessionManagerProcessor.cpp @@ -47,6 +47,9 @@ void CreateSessionProcessor::process(const cpp2::CreateSessionReq& req) { void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) { folly::SharedMutex::WriteHolder wHolder(LockUtils::sessionLock()); std::vector data; + std::unordered_map> + killedQueries; for (auto& session : req.get_sessions()) { auto sessionId = session.get_session_id(); auto sessionKey = MetaServiceUtils::sessionKey(sessionId); @@ -62,16 +65,46 @@ void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) { return; } + // update sessions to be saved if query is being killed, and return them to client. + auto& newQueries = *session.queries_ref(); + std::unordered_map killedQueriesInCurrentSession; + auto sessionInMeta = MetaServiceUtils::parseSessionVal(nebula::value(ret)); + for (const auto& savedQuery : sessionInMeta.get_queries()) { + auto epId = savedQuery.first; + auto newQuery = newQueries.find(epId); + if (newQuery == newQueries.end()) { + continue; + } + auto& desc = savedQuery.second; + if (desc.get_status() == cpp2::QueryStatus::KILLING) { + const_cast(newQuery->second) + .set_status(cpp2::QueryStatus::KILLING); + killedQueriesInCurrentSession.emplace(epId, desc); + } + } + if (!killedQueriesInCurrentSession.empty()) { + killedQueries[sessionId] = std::move(killedQueriesInCurrentSession); + } + + if (sessionInMeta.get_update_time() > session.get_update_time()) { + continue; + } + data.emplace_back(MetaServiceUtils::sessionKey(sessionId), MetaServiceUtils::sessionVal(session)); } + auto ret = doSyncPut(std::move(data)); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Put data error on meta server, errorCode: " << apache::thrift::util::enumNameSafe(ret); + handleErrorCode(ret); + onFinished(); + return; } - handleErrorCode(ret); + resp_.set_killed_queries(std::move(killedQueries)); + handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); onFinished(); } @@ -145,5 +178,49 @@ void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) { doRemove(sessionKey); } +void KillQueryProcessor::process(const cpp2::KillQueryReq& req) { + folly::SharedMutex::WriteHolder wHolder(LockUtils::sessionLock()); + auto& killQueries = req.get_kill_queries(); + + std::vector data; + for (auto& kv : killQueries) { + auto sessionId = kv.first; + auto sessionKey = MetaServiceUtils::sessionKey(sessionId); + auto ret = doGet(sessionKey); + if (!nebula::ok(ret)) { + auto errCode = nebula::error(ret); + LOG(ERROR) << "Session id `" << sessionId << "' not found"; + if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { + errCode = nebula::cpp2::ErrorCode::E_SESSION_NOT_FOUND; + } + handleErrorCode(errCode); + onFinished(); + return; + } + + auto session = MetaServiceUtils::parseSessionVal(nebula::value(ret)); + for (auto& epId : kv.second) { + auto query = session.queries_ref()->find(epId); + if (query == session.queries_ref()->end()) { + handleErrorCode(nebula::cpp2::ErrorCode::E_QUERY_NOT_FOUND); + onFinished(); + return; + } + query->second.set_status(cpp2::QueryStatus::KILLING); + } + + data.emplace_back(MetaServiceUtils::sessionKey(sessionId), + MetaServiceUtils::sessionVal(session)); + } + + auto putRet = doSyncPut(std::move(data)); + if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Put data error on meta server, errorCode: " + << apache::thrift::util::enumNameSafe(putRet); + } + handleErrorCode(putRet); + onFinished(); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/processors/sessionMan/SessionManagerProcessor.h b/src/meta/processors/sessionMan/SessionManagerProcessor.h index 66b454ed6..3711e3540 100644 --- a/src/meta/processors/sessionMan/SessionManagerProcessor.h +++ b/src/meta/processors/sessionMan/SessionManagerProcessor.h @@ -27,7 +27,7 @@ class CreateSessionProcessor : public BaseProcessor { }; -class UpdateSessionsProcessor : public BaseProcessor { +class UpdateSessionsProcessor : public BaseProcessor { public: static UpdateSessionsProcessor* instance(kvstore::KVStore* kvstore) { return new UpdateSessionsProcessor(kvstore); @@ -37,7 +37,7 @@ class UpdateSessionsProcessor : public BaseProcessor { private: explicit UpdateSessionsProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} + : BaseProcessor(kvstore) {} }; @@ -80,6 +80,19 @@ class RemoveSessionProcessor : public BaseProcessor { explicit RemoveSessionProcessor(kvstore::KVStore* kvstore) : BaseProcessor(kvstore) {} }; + +class KillQueryProcessor : public BaseProcessor { +public: + static KillQueryProcessor* instance(kvstore::KVStore* kvstore) { + return new KillQueryProcessor(kvstore); + } + + void process(const cpp2::KillQueryReq& req); + +private: + explicit KillQueryProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; } // namespace meta } // namespace nebula #endif // META_SESSIONMANAGERPROCESSOR_H diff --git a/src/meta/test/ProcessorTest.cpp b/src/meta/test/ProcessorTest.cpp index b3be8ceea..418b70db0 100644 --- a/src/meta/test/ProcessorTest.cpp +++ b/src/meta/test/ProcessorTest.cpp @@ -2600,6 +2600,7 @@ TEST(ProcessorTest, SessionManagerTest) { TestUtils::createSomeHosts(kv.get()); TestUtils::assembleSpace(kv.get(), 1, 1); SessionID sessionId = 0; + ExecutionPlanID epId = 1; { cpp2::CreateUserReq req; req.set_if_not_exists(false); @@ -2630,6 +2631,8 @@ TEST(ProcessorTest, SessionManagerTest) { meta::cpp2::Session session; session.set_session_id(sessionId); session.set_space_name("test"); + session.set_update_time(time::WallClock::fastNowInMicroSec()); + session.queries_ref()->emplace(epId, cpp2::QueryDesc()); req.set_sessions({session}); auto* processor = UpdateSessionsProcessor::instance(kv.get()); @@ -2637,6 +2640,7 @@ TEST(ProcessorTest, SessionManagerTest) { processor->process(req); auto resp = std::move(f).get(); ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + ASSERT_TRUE(resp.get_killed_queries().empty()); } // list session { @@ -2647,6 +2651,8 @@ TEST(ProcessorTest, SessionManagerTest) { processor->process(req); auto resp = std::move(f).get(); ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + ASSERT_EQ(resp.get_sessions().size(), 1); + ASSERT_EQ("test", resp.get_sessions()[0].get_space_name()); } // get session { @@ -2660,6 +2666,43 @@ TEST(ProcessorTest, SessionManagerTest) { ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); ASSERT_EQ("test", resp.get_session().get_space_name()); } + // Kill query + { + cpp2::KillQueryReq killReq; + std::unordered_map> killedQueries; + std::unordered_set eps = {epId}; + killedQueries.emplace(sessionId, std::move(eps)); + killReq.set_kill_queries(std::move(killedQueries)); + + auto* processor = KillQueryProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(killReq); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + } + // update session and get all killed queries + { + cpp2::UpdateSessionsReq req; + meta::cpp2::Session session; + session.set_session_id(sessionId); + session.set_space_name("test"); + session.queries_ref()->emplace(epId, cpp2::QueryDesc()); + req.set_sessions({session}); + + auto* processor = UpdateSessionsProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + auto& killedQueries = resp.get_killed_queries(); + EXPECT_EQ(killedQueries.size(), 1); + for (auto& s : killedQueries) { + EXPECT_EQ(s.first, sessionId); + for (auto& q : s.second) { + EXPECT_EQ(q.first, epId); + } + } + } // delete session { cpp2::RemoveSessionReq delReq;