From 669694ca706476536409f9586ef68411455aed1e Mon Sep 17 00:00:00 2001 From: cpw <13495049+CPWstatic@users.noreply.github.com> Date: Fri, 2 Jul 2021 13:02:13 +0800 Subject: [PATCH] Slow query. (#1152) * Add graph session manager. * Fix usage of concurrent map. * Use emplace. * Save the queries to session. * Kill query when execute. * Update the killed query from meta. * Add show and kill query sentence. * Add validator and node. * Support kill query. * Kill the query if session is destroyed and update addr to session if get from meta. * Fix update the killed queries from meta and delete query if execute error. * Load all unexpired sessions from meta, remove all expired sessions, remove all queries saved in sessions when start graph. * Add toString and test. * Mark query killed when session sign out or expired. * Add explain. * Add test and fix top N. * Make show queries as a traverse sentence. * Support kill from pipe. * Check expr type when kill query. * Add tck test. * Test slow query seperately. * Fix group by. * Fix kill current session query. * Remove top n. * Rebase and fix. * Check sessions by listing sessions. * Address comment. * Address comment. * Rebase and fix. * Add more test. * Address comment. * Format. * Charge session later. * Fix test. --- src/context/QueryContext.h | 10 ++ src/context/test/CMakeLists.txt | 2 +- src/daemons/CMakeLists.txt | 2 +- src/executor/CMakeLists.txt | 2 + src/executor/Executor.cpp | 16 +- src/executor/admin/KillQueryExecutor.cpp | 145 ++++++++++++++++++ src/executor/admin/KillQueryExecutor.h | 34 ++++ src/executor/admin/ShowQueriesExecutor.cpp | 99 ++++++++++++ src/executor/admin/ShowQueriesExecutor.h | 31 ++++ src/executor/test/CMakeLists.txt | 3 +- src/executor/test/ShowQueriesTest.cpp | 97 ++++++++++++ src/optimizer/test/CMakeLists.txt | 2 +- src/parser/AdminSentences.cpp | 21 +++ src/parser/AdminSentences.h | 69 +++++++++ src/parser/Sentence.h | 2 + src/parser/parser.yy | 53 ++++++- src/parser/scanner.lex | 5 + src/parser/test/CMakeLists.txt | 2 +- src/parser/test/ParserTest.cpp | 39 ++++- src/parser/test/ScannerTest.cpp | 12 ++ src/planner/plan/Admin.cpp | 12 ++ src/planner/plan/Admin.h | 46 ++++++ src/planner/plan/PlanNode.cpp | 4 + src/planner/plan/PlanNode.h | 3 + src/planner/test/CMakeLists.txt | 2 +- src/service/GraphService.cpp | 7 +- src/service/GraphService.h | 4 +- src/service/PermissionCheck.cpp | 4 + src/service/QueryInstance.cpp | 3 + src/service/RequestContext.h | 10 ++ src/session/CMakeLists.txt | 4 +- src/session/ClientSession.cpp | 72 ++++++++- src/session/ClientSession.h | 19 ++- ...ionManager.cpp => GraphSessionManager.cpp} | 142 +++++++++++------ ...SessionManager.h => GraphSessionManager.h} | 37 ++--- src/util/ExpressionUtils.cpp | 1 - src/util/test/CMakeLists.txt | 2 +- src/validator/AdminValidator.cpp | 59 +++++++ src/validator/AdminValidator.h | 26 ++++ src/validator/GroupByValidator.h | 3 +- src/validator/LimitValidator.h | 4 +- src/validator/OrderByValidator.h | 4 +- src/validator/Validator.cpp | 4 + src/validator/test/CMakeLists.txt | 2 +- src/visitor/test/CMakeLists.txt | 2 +- tests/Makefile | 6 +- tests/common/nebula_service.py | 2 +- tests/conftest.py | 38 ++++- tests/job/test_session.py | 6 + tests/nebula-test-run.py | 10 +- tests/tck/conftest.py | 9 ++ .../KillSlowQueryViaDiffrentService.feature | 91 +++++++++++ .../KillSlowQueryViaSameService.feature | 91 +++++++++++ ...t_kill_slow_query_via_different_service.py | 9 ++ .../test_kill_slow_query_via_same_service.py | 9 ++ 55 files changed, 1284 insertions(+), 109 deletions(-) create mode 100644 src/executor/admin/KillQueryExecutor.cpp create mode 100644 src/executor/admin/KillQueryExecutor.h create mode 100644 src/executor/admin/ShowQueriesExecutor.cpp create mode 100644 src/executor/admin/ShowQueriesExecutor.h create mode 100644 src/executor/test/ShowQueriesTest.cpp rename src/session/{SessionManager.cpp => GraphSessionManager.cpp} (58%) rename src/session/{SessionManager.h => GraphSessionManager.h} (63%) create mode 100644 tests/tck/slowquery/KillSlowQueryViaDiffrentService.feature create mode 100644 tests/tck/slowquery/KillSlowQueryViaSameService.feature create mode 100644 tests/tck/steps/test_kill_slow_query_via_different_service.py create mode 100644 tests/tck/steps/test_kill_slow_query_via_same_service.py diff --git a/src/context/QueryContext.h b/src/context/QueryContext.h index 193bf281c..49f098347 100644 --- a/src/context/QueryContext.h +++ b/src/context/QueryContext.h @@ -129,6 +129,14 @@ class QueryContext { rctx_->resp().errorCode = ErrorCode::E_PARTIAL_SUCCEEDED; } + void markKilled() { + killed_.exchange(true); + } + + bool isKilled() const { + return killed_.load(); + } + private: void init(); @@ -147,6 +155,8 @@ class QueryContext { std::unique_ptr objPool_; std::unique_ptr idGen_; std::unique_ptr symTable_; + + std::atomic killed_{false}; }; } // namespace graph diff --git a/src/context/test/CMakeLists.txt b/src/context/test/CMakeLists.txt index 989f1ca94..8356e5e13 100644 --- a/src/context/test/CMakeLists.txt +++ b/src/context/test/CMakeLists.txt @@ -37,7 +37,7 @@ SET(CONTEXT_TEST_LIBS $ $ $ - $ + $ $ $ ) diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index 26066c57b..63310257e 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -13,7 +13,7 @@ nebula_add_executable( $ $ $ - $ + $ $ $ $ diff --git a/src/executor/CMakeLists.txt b/src/executor/CMakeLists.txt index e894aab08..177cb107e 100644 --- a/src/executor/CMakeLists.txt +++ b/src/executor/CMakeLists.txt @@ -72,6 +72,8 @@ nebula_add_library( admin/SignInTSServiceExecutor.cpp admin/SignOutTSServiceExecutor.cpp admin/SessionExecutor.cpp + admin/ShowQueriesExecutor.cpp + admin/KillQueryExecutor.cpp maintain/TagExecutor.cpp maintain/TagIndexExecutor.cpp maintain/EdgeExecutor.cpp diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp index 71db221c1..663bbedd6 100644 --- a/src/executor/Executor.cpp +++ b/src/executor/Executor.cpp @@ -46,6 +46,8 @@ #include "executor/admin/SwitchSpaceExecutor.h" #include "executor/admin/UpdateUserExecutor.h" #include "executor/admin/ZoneExecutor.h" +#include "executor/admin/ShowQueriesExecutor.h" +#include "executor/admin/KillQueryExecutor.h" #include "executor/algo/BFSShortestPathExecutor.h" #include "executor/algo/CartesianProductExecutor.h" #include "executor/algo/ConjunctPathExecutor.h" @@ -497,6 +499,12 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) { case PlanNode::Kind::kUpdateSession: { return pool->add(new UpdateSessionExecutor(node, qctx)); } + case PlanNode::Kind::kShowQueries: { + return pool->add(new ShowQueriesExecutor(node, qctx)); + } + case PlanNode::Kind::kKillQuery: { + return pool->add(new KillQueryExecutor(node, qctx)); + } case PlanNode::Kind::kUnknown: { LOG(FATAL) << "Unknown plan node kind " << static_cast(node->kind()); break; @@ -521,6 +529,12 @@ Executor::Executor(const std::string &name, const PlanNode *node, QueryContext * Executor::~Executor() {} Status Executor::open() { + if (qctx_->isKilled()) { + VLOG(1) << "Execution is being killed. session: " << qctx()->rctx()->session()->id() + << "ep: " << qctx()->plan()->id() + << "query: " << qctx()->rctx()->query(); + return Status::Error("Execution had been killed"); + } auto status = MemInfo::make(); NG_RETURN_IF_ERROR(status); auto mem = std::move(status).value(); @@ -555,7 +569,7 @@ folly::Future Executor::start(Status status) const { } folly::Future Executor::error(Status status) const { - return folly::makeFuture(ExecutionError(std::move(status))).via(runner()); + return folly::makeFuture(std::move(status)).via(runner()); } void Executor::drop() { diff --git a/src/executor/admin/KillQueryExecutor.cpp b/src/executor/admin/KillQueryExecutor.cpp new file mode 100644 index 000000000..8a5ce1585 --- /dev/null +++ b/src/executor/admin/KillQueryExecutor.cpp @@ -0,0 +1,145 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "executor/admin/KillQueryExecutor.h" + +#include "context/QueryContext.h" +#include "planner/plan/Admin.h" + +namespace nebula { +namespace graph { +folly::Future KillQueryExecutor::execute() { + SCOPED_TIMER(&execTime_); + + // TODO: permision check + + QueriesMap toBeVerifiedQueries; + QueriesMap killQueries; + NG_RETURN_IF_ERROR(verifyTheQueriesByLocalCache(toBeVerifiedQueries, killQueries)); + + return qctx() + ->getMetaClient() + ->listSessions() + .via(runner()) + .thenValue([toBeVerifiedQueries = std::move(toBeVerifiedQueries), + killQueries = std::move(killQueries), + this](StatusOr listResp) mutable { + std::vector sessionsInMeta; + if (listResp.ok()) { + sessionsInMeta = std::move(listResp.value()).get_sessions(); + } else { + LOG(ERROR) << listResp.status(); + } + + auto status = verifyTheQueriesByMetaInfo(toBeVerifiedQueries, sessionsInMeta); + if (!status.ok()) { + return folly::makeFuture>(status); + } + + killCurrentHostQueries(killQueries); + + // upload all queries to be killed to meta. + return qctx()->getMetaClient()->killQuery(std::move(killQueries)); + }) + .thenValue([](auto&& resp) { + if (!resp.ok()) { + return resp.status(); + } + return Status::OK(); + }); +} + +Status KillQueryExecutor::verifyTheQueriesByLocalCache(QueriesMap& toBeVerifiedQueries, + QueriesMap& killQueries) { + auto* killQuery = asNode(node()); + auto inputVar = killQuery->inputVar(); + auto iter = ectx_->getResult(inputVar).iter(); + DCHECK(!!iter); + QueryExpressionContext ctx(ectx_); + auto sessionExpr = killQuery->sessionId(); + auto epExpr = killQuery->epId(); + + auto* session = qctx()->rctx()->session(); + auto* sessionMgr = qctx_->rctx()->sessionMgr(); + for (; iter->valid(); iter->next()) { + auto& sessionVal = sessionExpr->eval(ctx(iter.get())); + if (!sessionVal.isInt()) { + std::stringstream ss; + ss << "Session `" << sessionExpr->toString() << "' is not kind of" + << " int, but was " << sessionVal.type(); + return Status::Error(ss.str()); + } + auto& epVal = epExpr->eval(ctx(iter.get())); + if (!epVal.isInt()) { + std::stringstream ss; + ss << "ExecutionPlanID `" << epExpr->toString() << "' is not kind of" + << " int, but was " << epVal.type(); + return Status::Error(ss.str()); + } + + auto sessionId = sessionVal.getInt(); + auto epId = epVal.getInt(); + if (sessionId == session->id() || sessionId < 0) { + if (!session->findQuery(epId)) { + return Status::Error("ExecutionPlanId[%ld] does not exist in current Session.", + epId); + } + killQueries[session->id()].emplace(epId); + } else { + auto sessionPtr = sessionMgr->findSessionFromCache(sessionId); + if (sessionPtr == nullptr) { + toBeVerifiedQueries[sessionId].emplace(epId); + } else if (!sessionPtr->findQuery(epId)) { + return Status::Error("ExecutionPlanId[%ld] does not exist in Session[%ld].", + epId, sessionId); + } + killQueries[sessionId].emplace(epId); + } + } + return Status::OK(); +} + +void KillQueryExecutor::killCurrentHostQueries(const QueriesMap& killQueries) { + auto* session = qctx()->rctx()->session(); + auto* sessionMgr = qctx_->rctx()->sessionMgr(); + for (auto& s : killQueries) { + auto sessionId = s.first; + for (auto& epId : s.second) { + if (sessionId == session->id()) { + session->markQueryKilled(epId); + } else { + auto sessionPtr = sessionMgr->findSessionFromCache(sessionId); + if (sessionPtr != nullptr) { + sessionPtr->markQueryKilled(epId); + } + } + } + } +} + +Status KillQueryExecutor::verifyTheQueriesByMetaInfo( + const QueriesMap& toBeVerifiedQueries, + const std::vector& sessionsInMeta) { + for (auto& s : toBeVerifiedQueries) { + auto sessionId = s.first; + auto found = + std::find_if(sessionsInMeta.begin(), sessionsInMeta.end(), [sessionId](auto& val) { + return val.get_session_id() == sessionId; + }); + if (found == sessionsInMeta.end()) { + return Status::Error("SessionId[%ld] does not exist", sessionId); + } + for (auto& epId : s.second) { + if (found->get_queries().find(epId) == found->get_queries().end()) { + return Status::Error( + "ExecutionPlanId[%ld] does not exist in Session[%ld].", epId, sessionId); + } + } + } + return Status::OK(); +} +} // namespace graph +} // namespace nebula diff --git a/src/executor/admin/KillQueryExecutor.h b/src/executor/admin/KillQueryExecutor.h new file mode 100644 index 000000000..5d77ea108 --- /dev/null +++ b/src/executor/admin/KillQueryExecutor.h @@ -0,0 +1,34 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef EXECUTOR_ADMIN_KILLQUERYEXECUTOR_H_ +#define EXECUTOR_ADMIN_KILLQUERYEXECUTOR_H_ + +#include "executor/Executor.h" + +namespace nebula { +namespace graph { +class KillQueryExecutor final : public Executor { +public: + KillQueryExecutor(const PlanNode *node, QueryContext *qctx) + : Executor("KillQueryExecutor", node, qctx) {} + + folly::Future execute() override; + +private: + using QueriesMap = std::unordered_map>; + Status verifyTheQueriesByLocalCache(QueriesMap& toBeVerifiedQueries, + QueriesMap& killQueries); + + Status verifyTheQueriesByMetaInfo(const QueriesMap& toBeVerifiedQueries, + const std::vector& sessionsInMeta); + + void killCurrentHostQueries(const QueriesMap& killQueries); +}; +} // namespace graph +} // namespace nebula + +#endif // EXECUTOR_ADMIN_KILLQUERYEXECUTOR_H_ diff --git a/src/executor/admin/ShowQueriesExecutor.cpp b/src/executor/admin/ShowQueriesExecutor.cpp new file mode 100644 index 000000000..2ff9cbfea --- /dev/null +++ b/src/executor/admin/ShowQueriesExecutor.cpp @@ -0,0 +1,99 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. +* +* This source code is licensed under Apache 2.0 License, +* attached with Common Clause Condition 1.0, found in the LICENSES directory. +*/ + +#include + +#include "executor/admin/ShowQueriesExecutor.h" + +#include "common/time/TimeUtils.h" +#include "context/QueryContext.h" +#include "planner/plan/Admin.h" +#include "util/ScopedTimer.h" + +namespace nebula { +namespace graph { +folly::Future ShowQueriesExecutor::execute() { + SCOPED_TIMER(&execTime_); + + auto* showQueries = asNode(node()); + auto isAll = showQueries->isAll(); + + if (!isAll) { + return showCurrentSessionQueries(); + } else { + return showAllSessionQueries(); + } +} + +folly::Future ShowQueriesExecutor::showCurrentSessionQueries() { + DataSet dataSet({"SessionID", + "ExecutionPlanID", + "User", + "Host", + "StartTime", + "DurationInUSec", + "Status", + "Query"}); + auto* session = qctx()->rctx()->session(); + auto sessionInMeta = session->getSession(); + + addQueries(sessionInMeta, dataSet); + return finish(ResultBuilder() + .value(Value(std::move(dataSet))) + .iter(Iterator::Kind::kSequential) + .finish()); +} + +// The queries might not sync to meta completely. +folly::Future ShowQueriesExecutor::showAllSessionQueries() { + return qctx()->getMetaClient()->listSessions() + .via(runner()) + .thenValue([this](StatusOr resp) { + SCOPED_TIMER(&execTime_); + if (!resp.ok()) { + return Status::Error("Show sessions failed: %s.", + resp.status().toString().c_str()); + } + auto sessions = resp.value().get_sessions(); + DataSet dataSet({"SessionID", + "ExecutionPlanID", + "User", + "Host", + "StartTime", + "DurationInUSec", + "Status", + "Query"}); + for (auto& session : sessions) { + addQueries(session, dataSet); + } + return finish(ResultBuilder() + .value(Value(std::move(dataSet))) + .iter(Iterator::Kind::kSequential) + .finish()); + }); +} + +void ShowQueriesExecutor::addQueries(const meta::cpp2::Session& session, DataSet& dataSet) const { + auto& queries = session.get_queries(); + for (auto& query : queries) { + Row row; + row.values.emplace_back(session.get_session_id()); + row.values.emplace_back(query.first); + row.values.emplace_back(session.get_user_name()); + row.values.emplace_back(query.second.get_graph_addr().toString()); + auto dateTime = + time::TimeConversion::unixSecondsToDateTime(query.second.get_start_time() / 1000000); + dateTime.microsec = query.second.get_start_time() % 1000000; + row.values.emplace_back(std::move(dateTime)); + row.values.emplace_back(query.second.get_duration()); + row.values.emplace_back(apache::thrift::util::enumNameSafe(query.second.get_status())); + row.values.emplace_back(query.second.get_query()); + dataSet.rows.emplace_back(std::move(row)); + } +} + +} // namespace graph +} // namespace nebula diff --git a/src/executor/admin/ShowQueriesExecutor.h b/src/executor/admin/ShowQueriesExecutor.h new file mode 100644 index 000000000..ac28dd2c8 --- /dev/null +++ b/src/executor/admin/ShowQueriesExecutor.h @@ -0,0 +1,31 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. +* +* This source code is licensed under Apache 2.0 License, +* attached with Common Clause Condition 1.0, found in the LICENSES directory. +*/ + +#ifndef EXECUTOR_ADMIN_SHOWQUERIESEXECUTOR_H_ +#define EXECUTOR_ADMIN_SHOWQUERIESEXECUTOR_H_ + +#include "executor/Executor.h" + +namespace nebula { +namespace graph { +class ShowQueriesExecutor final : public Executor { +public: + ShowQueriesExecutor(const PlanNode *node, QueryContext *qctx) + : Executor("ShowQueriesExecutor", node, qctx) {} + + folly::Future execute() override; + +private: + friend class ShowQueriesTest_TestAddQueryAndTopN_Test; + folly::Future showCurrentSessionQueries(); + + folly::Future showAllSessionQueries(); + + void addQueries(const meta::cpp2::Session& session, DataSet& dataSet) const; +}; +} // namespace graph +} // namespace nebula +#endif // EXECUTOR_ADMIN_SHOWQUERIESEXECUTOR_H_ diff --git a/src/executor/test/CMakeLists.txt b/src/executor/test/CMakeLists.txt index 6e761a1df..38674ec35 100644 --- a/src/executor/test/CMakeLists.txt +++ b/src/executor/test/CMakeLists.txt @@ -34,7 +34,7 @@ SET(EXEC_QUERY_TEST_OBJS $ $ $ - $ + $ $ $ $ @@ -80,6 +80,7 @@ nebula_add_test( ProduceAllPathsTest.cpp CartesianProductTest.cpp AssignTest.cpp + ShowQueriesTest.cpp OBJECTS ${EXEC_QUERY_TEST_OBJS} LIBRARIES diff --git a/src/executor/test/ShowQueriesTest.cpp b/src/executor/test/ShowQueriesTest.cpp new file mode 100644 index 000000000..16fb83b18 --- /dev/null +++ b/src/executor/test/ShowQueriesTest.cpp @@ -0,0 +1,97 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include +#include "common/time/TimeUtils.h" +#include "executor/admin/ShowQueriesExecutor.h" +#include "executor/test/QueryTestBase.h" +#include "planner/plan/Admin.h" + +namespace nebula { +namespace graph { +class ShowQueriesTest : public QueryTestBase { +}; + +TEST_F(ShowQueriesTest, TestAddQueryAndTopN) { + meta::cpp2::Session session; + session.set_session_id(1); + session.set_create_time(123); + session.set_update_time(456); + session.set_user_name("root"); + session.set_space_name("test"); + session.set_graph_addr(HostAddr("127.0.0.1", 9669)); + + { + meta::cpp2::QueryDesc desc; + desc.set_start_time(123); + desc.set_status(meta::cpp2::QueryStatus::RUNNING); + desc.set_duration(100); + desc.set_query(""); + desc.set_graph_addr(HostAddr("127.0.0.1", 9669)); + + session.queries_ref()->emplace(1, std::move(desc)); + } + { + meta::cpp2::QueryDesc desc; + desc.set_start_time(123); + desc.set_status(meta::cpp2::QueryStatus::RUNNING); + desc.set_duration(200); + desc.set_query(""); + desc.set_graph_addr(HostAddr("127.0.0.1", 9669)); + + session.queries_ref()->emplace(2, std::move(desc)); + } + + DataSet dataSet({"SessionID", + "ExecutionPlanID", + "User", + "Host", + "StartTime", + "DurationInUSec", + "Status", + "Query"}); + DataSet expected = dataSet; + { + Row row; + row.emplace_back(1); + row.emplace_back(1); + row.emplace_back("root"); + row.emplace_back("\"127.0.0.1\":9669"); + auto dateTime = + time::TimeConversion::unixSecondsToDateTime(123 / 1000000); + dateTime.microsec = 123; + row.emplace_back(std::move(dateTime)); + row.emplace_back(100); + row.emplace_back("RUNNING"); + row.emplace_back(""); + expected.rows.emplace_back(std::move(row)); + } + { + Row row; + row.emplace_back(1); + row.emplace_back(2); + row.emplace_back("root"); + row.emplace_back("\"127.0.0.1\":9669"); + auto dateTime = + time::TimeConversion::unixSecondsToDateTime(123 / 1000000); + dateTime.microsec = 123; + row.emplace_back(std::move(dateTime)); + row.emplace_back(200); + row.emplace_back("RUNNING"); + row.emplace_back(""); + expected.rows.emplace_back(std::move(row)); + } + + auto* showQueries = ShowQueries::make(qctx_.get(), nullptr, true); + ShowQueriesExecutor exe(showQueries, qctx_.get()); + exe.addQueries(session, dataSet); + EXPECT_EQ(expected.size(), dataSet.size()); + for (auto& row : expected) { + EXPECT_TRUE(std::find(dataSet.rows.begin(), dataSet.rows.end(), row) != dataSet.rows.end()); + } +} +} // namespace graph +} // namespace nebula diff --git a/src/optimizer/test/CMakeLists.txt b/src/optimizer/test/CMakeLists.txt index b6d604b3c..2c6b6df6c 100644 --- a/src/optimizer/test/CMakeLists.txt +++ b/src/optimizer/test/CMakeLists.txt @@ -34,7 +34,7 @@ set(OPTIMIZER_TEST_LIB $ $ $ - $ + $ $ $ $ diff --git a/src/parser/AdminSentences.cpp b/src/parser/AdminSentences.cpp index c975b937d..0309ed549 100644 --- a/src/parser/AdminSentences.cpp +++ b/src/parser/AdminSentences.cpp @@ -369,4 +369,25 @@ std::string ShowSessionsSentence::toString() const { return "SHOW SESSIONS"; } +std::string ShowQueriesSentence::toString() const { + std::string buf = "SHOW"; + if (isAll()) { + buf += " ALL"; + } + buf += " QUERIES"; + return buf; +} + +std::string KillQuerySentence::toString() const { + std::string buf = "KILL QUERY ("; + if (isSetSession()) { + buf += "session="; + buf += sessionId()->toString(); + buf += ", "; + } + buf += "plan="; + buf += epId()->toString(); + buf += ")"; + return buf; +} } // namespace nebula diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index dc17e9af7..8da3e57a4 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -732,6 +732,75 @@ class ShowSessionsSentence final : public Sentence { SessionID sessionId_{0}; bool setSeesionId_{false}; }; + +class ShowQueriesSentence final : public Sentence { +public: + explicit ShowQueriesSentence(bool isAll = false) { + kind_ = Kind::kShowQueries; + isAll_ = isAll; + } + + bool isAll() const { + return isAll_; + } + + std::string toString() const override; + +private: + bool isAll_{false}; +}; + +class QueryUniqueIdentifier final { +public: + explicit QueryUniqueIdentifier(Expression* epId, Expression* sessionId) + : epId_(epId), sessionId_(sessionId) {} + + Expression* sessionId() const { + return sessionId_; + } + + Expression* epId() const { + return epId_; + } + + void setSession() { + isSetSession_ = true; + } + + bool isSetSession() const { + return isSetSession_; + } + +private: + Expression* epId_{nullptr}; + Expression* sessionId_{nullptr}; + bool isSetSession_{false}; +}; + +class KillQuerySentence final : public Sentence { +public: + explicit KillQuerySentence(QueryUniqueIdentifier* identifier) { + kind_ = Kind::kKillQuery; + identifier_.reset(identifier); + } + + Expression* sessionId() const { + return identifier_->sessionId(); + } + + Expression* epId() const { + return identifier_->epId(); + } + + std::string toString() const override; + +private: + bool isSetSession() const { + return identifier_->isSetSession(); + } + + std::unique_ptr identifier_; +}; } // namespace nebula #endif // PARSER_ADMINSENTENCES_H_ diff --git a/src/parser/Sentence.h b/src/parser/Sentence.h index 81942c603..eda75f922 100644 --- a/src/parser/Sentence.h +++ b/src/parser/Sentence.h @@ -130,6 +130,8 @@ class Sentence { kCreateFTIndex, kDropFTIndex, kShowSessions, + kShowQueries, + kKillQuery, }; Kind kind() const { diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 4188970ee..0bf5272c9 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -145,13 +145,14 @@ static constexpr size_t kCommentLengthLimit = 256; nebula::TextSearchArgument *fuzzy_text_search_argument; nebula::meta::cpp2::FTClient *text_search_client_item; nebula::TSClientList *text_search_client_list; + nebula::QueryUniqueIdentifier *query_unique_identifier; } /* destructors */ %destructor {} // Expression related memory will be managed by object pool -%destructor {} -%destructor {} +%destructor {} +%destructor {} %destructor {} %destructor { delete $$; } <*> @@ -196,6 +197,7 @@ static constexpr size_t kCommentLengthLimit = 256; %token KW_ANY KW_SINGLE KW_NONE %token KW_REDUCE %token KW_SESSIONS KW_SESSION +%token KW_KILL KW_QUERY KW_QUERIES KW_TOP /* symbols */ %token L_PAREN R_PAREN L_BRACKET R_BRACKET L_BRACE R_BRACE COMMA @@ -236,6 +238,7 @@ static constexpr size_t kCommentLengthLimit = 256; %type compound_expression %type text_search_expression %type constant_expression +%type query_unique_identifier_value %type argument_list opt_argument_list %type type_spec %type step_clause @@ -332,6 +335,8 @@ static constexpr size_t kCommentLengthLimit = 256; %type index_field %type index_field_list opt_index_field_list +%type query_unique_identifier + %type maintain_sentence %type create_space_sentence describe_space_sentence drop_space_sentence %type create_tag_sentence create_edge_sentence @@ -351,6 +356,7 @@ static constexpr size_t kCommentLengthLimit = 256; %type admin_job_sentence %type create_user_sentence alter_user_sentence drop_user_sentence change_password_sentence +%type show_queries_sentence kill_query_sentence %type show_sentence %type mutate_sentence @@ -512,6 +518,10 @@ unreserved_keyword | KW_SESSION { $$ = new std::string("session"); } | KW_SESSIONS { $$ = new std::string("sessions"); } | KW_SAMPLE { $$ = new std::string("sample"); } + | KW_QUERIES { $$ = new std::string("queries"); } + | KW_QUERY { $$ = new std::string("query"); } + | KW_KILL { $$ = new std::string("kill"); } + | KW_TOP { $$ = new std::string("top"); } ; expression @@ -2514,6 +2524,8 @@ traverse_sentence | get_subgraph_sentence { $$ = $1; } | delete_vertex_sentence { $$ = $1; } | delete_edge_sentence { $$ = $1; } + | show_queries_sentence { $$ = $1; } + | kill_query_sentence { $$ = $1; } ; piped_sentence @@ -2851,6 +2863,15 @@ job_concurrency } ; +show_queries_sentence + : KW_SHOW KW_QUERIES { + $$ = new ShowQueriesSentence(); + } + | KW_SHOW KW_ALL KW_QUERIES { + $$ = new ShowQueriesSentence(true); + } + ; + show_sentence : KW_SHOW KW_HOSTS { $$ = new ShowHostsSentence(meta::cpp2::ListHostType::ALLOC); @@ -3287,6 +3308,34 @@ list_listener_sentence } ; +kill_query_sentence + : KW_KILL KW_QUERY L_PAREN query_unique_identifier R_PAREN { + $$ = new KillQuerySentence($4); + } + +query_unique_identifier_value + : legal_integer { + $$ = ConstantExpression::make(qctx->objPool(), $1); + } + | input_prop_expression { + $$ = $1; + } + ; + +query_unique_identifier + : KW_PLAN ASSIGN query_unique_identifier_value { + $$ = new QueryUniqueIdentifier($3, ConstantExpression::make(qctx->objPool(), Value(-1))); + } + | KW_SESSION ASSIGN query_unique_identifier_value COMMA KW_PLAN ASSIGN query_unique_identifier_value { + $$ = new QueryUniqueIdentifier($7, $3); + $$->setSession(); + } + | KW_PLAN ASSIGN query_unique_identifier_value COMMA KW_SESSION ASSIGN query_unique_identifier_value { + $$ = new QueryUniqueIdentifier($3, $7); + $$->setSession(); + } + ; + mutate_sentence : insert_vertex_sentence { $$ = $1; } | insert_edge_sentence { $$ = $1; } diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index c4658dee2..bfaf183f2 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -250,6 +250,11 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5]) "SESSIONS" { return TokenType::KW_SESSIONS; } "SESSION" { return TokenType::KW_SESSION; } "SAMPLE" { return TokenType::KW_SAMPLE; } +"QUERIES" { return TokenType::KW_QUERIES; } +"QUERY" { return TokenType::KW_QUERY; } +"KILL" { return TokenType::KW_KILL; } +"TOP" { return TokenType::KW_TOP; } + "TRUE" { yylval->boolval = true; return TokenType::BOOL; } "FALSE" { yylval->boolval = false; return TokenType::BOOL; } diff --git a/src/parser/test/CMakeLists.txt b/src/parser/test/CMakeLists.txt index 1b05ef1e0..fbaf148c3 100644 --- a/src/parser/test/CMakeLists.txt +++ b/src/parser/test/CMakeLists.txt @@ -32,7 +32,7 @@ set(PARSER_TEST_LIBS $ $ $ - $ + $ $ $ $ diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index 99bbfed10..0f95ada7c 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -2779,15 +2779,17 @@ TEST_F(ParserTest, FullTextServiceTest) { auto result = parse(query); ASSERT_FALSE(result.ok()); } +} + + +TEST_F(ParserTest, SessionTest) { { - // GQLParser parser(qctx.get()); std::string query = "SHOW SESSIONS"; auto result = parse(query); ASSERT_TRUE(result.ok()) << result.status(); ASSERT_EQ(result.value()->toString(), "SHOW SESSIONS"); } { - // GQLParser parser(qctx.get()); std::string query = "SHOW SESSION 123"; auto result = parse(query); ASSERT_TRUE(result.ok()) << result.status(); @@ -2820,4 +2822,37 @@ TEST_F(ParserTest, JobTest) { checkTest("REBUILD EDGE INDEX name_index, age_index", "REBUILD EDGE INDEX name_index,age_index"); } + +TEST_F(ParserTest, ShowAndKillQueryTest) { + { + std::string query = "SHOW QUERIES"; + auto result = parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + ASSERT_EQ(result.value()->toString(), "SHOW QUERIES"); + } + { + std::string query = "SHOW ALL QUERIES"; + auto result = parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + ASSERT_EQ(result.value()->toString(), "SHOW ALL QUERIES"); + } + { + std::string query = "KILL QUERY (plan=123)"; + auto result = parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + ASSERT_EQ(result.value()->toString(), "KILL QUERY (plan=123)"); + } + { + std::string query = "KILL QUERY (session=123, plan=123)"; + auto result = parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + ASSERT_EQ(result.value()->toString(), "KILL QUERY (session=123, plan=123)"); + } + { + std::string query = "KILL QUERY (plan=123, session=123)"; + auto result = parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + ASSERT_EQ(result.value()->toString(), "KILL QUERY (session=123, plan=123)"); + } +} } // namespace nebula diff --git a/src/parser/test/ScannerTest.cpp b/src/parser/test/ScannerTest.cpp index 8abd755d3..1aed7e144 100644 --- a/src/parser/test/ScannerTest.cpp +++ b/src/parser/test/ScannerTest.cpp @@ -497,6 +497,18 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_TYPE("SESSION", TokenType::KW_SESSION), CHECK_SEMANTIC_TYPE("Session", TokenType::KW_SESSION), CHECK_SEMANTIC_TYPE("session", TokenType::KW_SESSION), + CHECK_SEMANTIC_TYPE("QUERY", TokenType::KW_QUERY), + CHECK_SEMANTIC_TYPE("Query", TokenType::KW_QUERY), + CHECK_SEMANTIC_TYPE("query", TokenType::KW_QUERY), + CHECK_SEMANTIC_TYPE("QUERIES", TokenType::KW_QUERIES), + CHECK_SEMANTIC_TYPE("Queries", TokenType::KW_QUERIES), + CHECK_SEMANTIC_TYPE("queries", TokenType::KW_QUERIES), + CHECK_SEMANTIC_TYPE("KILL", TokenType::KW_KILL), + CHECK_SEMANTIC_TYPE("Kill", TokenType::KW_KILL), + CHECK_SEMANTIC_TYPE("kill", TokenType::KW_KILL), + CHECK_SEMANTIC_TYPE("TOP", TokenType::KW_TOP), + CHECK_SEMANTIC_TYPE("Top", TokenType::KW_TOP), + CHECK_SEMANTIC_TYPE("top", TokenType::KW_TOP), CHECK_SEMANTIC_TYPE("_type", TokenType::TYPE_PROP), CHECK_SEMANTIC_TYPE("_id", TokenType::ID_PROP), diff --git a/src/planner/plan/Admin.cpp b/src/planner/plan/Admin.cpp index e450ee6d2..6f4a1d274 100644 --- a/src/planner/plan/Admin.cpp +++ b/src/planner/plan/Admin.cpp @@ -161,5 +161,17 @@ std::unique_ptr ShowBalance::explain() const { return desc; } +std::unique_ptr ShowQueries::explain() const { + auto desc = SingleDependencyNode::explain(); + addDescription("isAll", util::toJson(isAll()), desc.get()); + return desc; +} + +std::unique_ptr KillQuery::explain() const { + auto desc = SingleDependencyNode::explain(); + addDescription("sessionId", sessionId()->toString(), desc.get()); + addDescription("planId", epId()->toString(), desc.get()); + return desc; +} } // namespace graph } // namespace nebula diff --git a/src/planner/plan/Admin.h b/src/planner/plan/Admin.h index 2bad28844..48513d002 100644 --- a/src/planner/plan/Admin.h +++ b/src/planner/plan/Admin.h @@ -1367,6 +1367,52 @@ class UpdateSession final : public SingleInputNode { private: meta::cpp2::Session session_; }; + +class ShowQueries final : public SingleInputNode { +public: + static ShowQueries* make(QueryContext* qctx, PlanNode* input, bool isAll) { + return qctx->objPool()->add(new ShowQueries(qctx, input, isAll)); + } + + bool isAll() const { + return isAll_; + } + + std::unique_ptr explain() const override; + +private: + explicit ShowQueries(QueryContext* qctx, PlanNode* input, bool isAll) + : SingleInputNode(qctx, Kind::kShowQueries, input), isAll_(isAll) {} + + bool isAll_{false}; +}; + +class KillQuery final : public SingleInputNode { +public: + static KillQuery* make(QueryContext* qctx, + PlanNode* input, + Expression* sessionId, + Expression* epId) { + return qctx->objPool()->add(new KillQuery(qctx, input, sessionId, epId)); + } + + Expression* sessionId() const { + return sessionId_; + } + + Expression* epId() const { + return epId_; + } + + std::unique_ptr explain() const override; + +private: + explicit KillQuery(QueryContext* qctx, PlanNode* input, Expression* sessionId, Expression* epId) + : SingleInputNode(qctx, Kind::kKillQuery, input), sessionId_(sessionId), epId_(epId) {} + + Expression* sessionId_; + Expression* epId_; +}; } // namespace graph } // namespace nebula #endif // PLANNER_PLAN_ADMIN_H_ diff --git a/src/planner/plan/PlanNode.cpp b/src/planner/plan/PlanNode.cpp index a1fe9165e..43dde5423 100644 --- a/src/planner/plan/PlanNode.cpp +++ b/src/planner/plan/PlanNode.cpp @@ -272,6 +272,10 @@ const char* PlanNode::toString(PlanNode::Kind kind) { return "ShowSessions"; case Kind::kUpdateSession: return "UpdateSession"; + case Kind::kShowQueries: + return "ShowQueries"; + case Kind::kKillQuery: + return "KillQuery"; // no default so the compiler will warning when lack } LOG(FATAL) << "Impossible kind plan node " << static_cast(kind); diff --git a/src/planner/plan/PlanNode.h b/src/planner/plan/PlanNode.h index a4a774a2b..acc43a86a 100644 --- a/src/planner/plan/PlanNode.h +++ b/src/planner/plan/PlanNode.h @@ -162,6 +162,9 @@ class PlanNode { kIngest, kShowSessions, kUpdateSession, + + kShowQueries, + kKillQuery, }; bool isQueryNode() const { diff --git a/src/planner/test/CMakeLists.txt b/src/planner/test/CMakeLists.txt index df54a3458..9c119c10e 100644 --- a/src/planner/test/CMakeLists.txt +++ b/src/planner/test/CMakeLists.txt @@ -37,7 +37,7 @@ nebula_add_test( $ $ $ - $ + $ $ $ $ diff --git a/src/service/GraphService.cpp b/src/service/GraphService.cpp index d3a358a66..f2b8987d5 100644 --- a/src/service/GraphService.cpp +++ b/src/service/GraphService.cpp @@ -46,7 +46,11 @@ Status GraphService::init(std::shared_ptr ioExecuto LOG(WARNING) << "Failed to synchronously wait for meta service ready"; } - sessionManager_ = std::make_unique(metaClient_.get(), hostAddr); + sessionManager_ = std::make_unique(metaClient_.get(), hostAddr); + auto initSessionMgrStatus = sessionManager_->init(); + if (!initSessionMgrStatus.ok()) { + LOG(WARNING) << "Init sessin manager failed: " << initSessionMgrStatus.toString(); + } queryEngine_ = std::make_unique(); myAddr_ = hostAddr; @@ -121,6 +125,7 @@ GraphService::future_execute(int64_t sessionId, const std::string& query) { auto ctx = std::make_unique>(); ctx->setQuery(query); ctx->setRunner(getThreadManager()); + ctx->setSessionMgr(sessionManager_.get()); auto future = ctx->future(); stats::StatsManager::addValue(kNumQueries); auto cb = [this, sessionId, ctx = std::move(ctx)] diff --git a/src/service/GraphService.h b/src/service/GraphService.h index e1d525ff0..744b8a32d 100644 --- a/src/service/GraphService.h +++ b/src/service/GraphService.h @@ -11,7 +11,7 @@ #include "common/interface/gen-cpp2/GraphService.h" #include "service/Authenticator.h" #include "service/QueryEngine.h" -#include "session/SessionManager.h" +#include "session/GraphSessionManager.h" namespace folly { class IOThreadPoolExecutor; @@ -40,7 +40,7 @@ class GraphService final : public cpp2::GraphServiceSvIf { private: bool auth(const std::string& username, const std::string& password); - std::unique_ptr sessionManager_; + std::unique_ptr sessionManager_; std::unique_ptr queryEngine_; std::unique_ptr metaClient_; HostAddr myAddr_; diff --git a/src/service/PermissionCheck.cpp b/src/service/PermissionCheck.cpp index b34a192ab..717a872e0 100644 --- a/src/service/PermissionCheck.cpp +++ b/src/service/PermissionCheck.cpp @@ -205,6 +205,10 @@ Status PermissionCheck::permissionCheck(ClientSession *session, // No permission checking for sequential sentence. return Status::OK(); } + case Sentence::Kind::kShowQueries: + case Sentence::Kind::kKillQuery: { + return Status::OK(); + } } LOG(ERROR) << "Impossible permission checking for sentence " << sentence->kind(); return Status::Error("Impossible permission checking for sentence %d.", diff --git a/src/service/QueryInstance.cpp b/src/service/QueryInstance.cpp index a8a639be8..4b4f972f7 100644 --- a/src/service/QueryInstance.cpp +++ b/src/service/QueryInstance.cpp @@ -31,6 +31,7 @@ QueryInstance::QueryInstance(std::unique_ptr qctx, Optimizer *opti qctx_ = std::move(qctx); optimizer_ = DCHECK_NOTNULL(optimizer); scheduler_ = std::make_unique(qctx_.get()); + qctx_->rctx()->session()->addQuery(qctx_.get()); } void QueryInstance::execute() { @@ -97,6 +98,7 @@ void QueryInstance::onFinish() { addSlowQueryStats(latency); rctx->finish(); + rctx->session()->deleteQuery(qctx_.get()); // The `QueryInstance' is the root node holding all resources during the execution. // When the whole query process is done, it's safe to release this object, as long as // no other contexts have chances to access these resources later on, @@ -151,6 +153,7 @@ void QueryInstance::onError(Status status) { rctx->resp().latencyInUs = latency; stats::StatsManager::addValue(kNumQueryErrors); addSlowQueryStats(latency); + rctx->session()->deleteQuery(qctx_.get()); rctx->finish(); delete this; } diff --git a/src/service/RequestContext.h b/src/service/RequestContext.h index d1ed4de84..8603a3a7e 100644 --- a/src/service/RequestContext.h +++ b/src/service/RequestContext.h @@ -12,6 +12,7 @@ #include "common/cpp/helpers.h" #include "common/time/Duration.h" #include "session/ClientSession.h" +#include "session/GraphSessionManager.h" /** * RequestContext holds context infos of a specific request from a client. @@ -80,6 +81,14 @@ class RequestContext final : public cpp::NonCopyable, public cpp::NonMovable { promise_.setValue(std::move(resp_)); } + void setSessionMgr(GraphSessionManager* mgr) { + sessionMgr_ = mgr; + } + + GraphSessionManager* sessionMgr() const { + return sessionMgr_; + } + private: time::Duration duration_; std::string query_; @@ -87,6 +96,7 @@ class RequestContext final : public cpp::NonCopyable, public cpp::NonMovable { folly::Promise promise_; std::shared_ptr session_; folly::Executor *runner_{nullptr}; + GraphSessionManager *sessionMgr_{nullptr}; }; } // namespace graph diff --git a/src/session/CMakeLists.txt b/src/session/CMakeLists.txt index 3ef431241..e6d64f0be 100644 --- a/src/session/CMakeLists.txt +++ b/src/session/CMakeLists.txt @@ -4,8 +4,8 @@ # attached with Common Clause Condition 1.0, found in the LICENSES directory. nebula_add_library( - session_obj OBJECT - SessionManager.cpp + graph_session_obj OBJECT + GraphSessionManager.cpp ClientSession.cpp ) diff --git a/src/session/ClientSession.cpp b/src/session/ClientSession.cpp index 64406cb05..72f2e251a 100644 --- a/src/session/ClientSession.cpp +++ b/src/session/ClientSession.cpp @@ -4,19 +4,19 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -#include "common/time/WallClock.h" - #include "session/ClientSession.h" +#include "common/time/WallClock.h" +#include "context/QueryContext.h" namespace nebula { namespace graph { -ClientSession::ClientSession(meta::cpp2::Session &&session, meta::MetaClient* metaClient) { +ClientSession::ClientSession(meta::cpp2::Session&& session, meta::MetaClient* metaClient) { session_ = std::move(session); metaClient_ = metaClient; } -std::shared_ptr ClientSession::create(meta::cpp2::Session &&session, +std::shared_ptr ClientSession::create(meta::cpp2::Session&& session, meta::MetaClient* metaClient) { return std::shared_ptr(new ClientSession(std::move(session), metaClient)); } @@ -32,5 +32,65 @@ uint64_t ClientSession::idleSeconds() { return idleDuration_.elapsedInSec(); } -} // namespace graph -} // namespace nebula +void ClientSession::addQuery(QueryContext* qctx) { + auto epId = qctx->plan()->id(); + meta::cpp2::QueryDesc queryDesc; + queryDesc.set_start_time(time::WallClock::fastNowInMicroSec()); + queryDesc.set_status(meta::cpp2::QueryStatus::RUNNING); + queryDesc.set_query(qctx->rctx()->query()); + queryDesc.set_graph_addr(session_.get_graph_addr()); + VLOG(1) << "Add query: " << qctx->rctx()->query() << ", epId: " << epId; + + folly::RWSpinLock::WriteHolder wHolder(rwSpinLock_); + contexts_.emplace(epId, qctx); + session_.queries_ref()->emplace(epId, std::move(queryDesc)); +} + +void ClientSession::deleteQuery(QueryContext* qctx) { + auto epId = qctx->plan()->id(); + VLOG(1) << "Delete query, epId: " << epId; + folly::RWSpinLock::WriteHolder wHolder(rwSpinLock_); + contexts_.erase(epId); + session_.queries_ref()->erase(epId); +} + +bool ClientSession::findQuery(nebula::ExecutionPlanID epId) { + folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_); + auto context = contexts_.find(epId); + if (context != contexts_.end()) { + return true; + } + + auto query = session_.queries_ref()->find(epId); + if (query != session_.queries_ref()->end()) { + return true; + } + return false; +} + +void ClientSession::markQueryKilled(nebula::ExecutionPlanID epId) { + folly::RWSpinLock::WriteHolder wHolder(rwSpinLock_); + auto context = contexts_.find(epId); + if (context == contexts_.end()) { + return; + } + context->second->markKilled(); + VLOG(1) << "Mark query killed in local cache, epId: " << epId; + + auto query = session_.queries_ref()->find(epId); + if (query == session_.queries_ref()->end()) { + return; + } + query->second.set_status(meta::cpp2::QueryStatus::KILLING); + VLOG(1) << "Mark query killed in meta, epId: " << epId; +} + +void ClientSession::markAllQueryKilled() { + folly::RWSpinLock::WriteHolder wHolder(rwSpinLock_); + for (auto& context : contexts_) { + context.second->markKilled(); + session_.queries_ref()->clear(); + } +} +} // namespace graph +} // namespace nebula diff --git a/src/session/ClientSession.h b/src/session/ClientSession.h index a60d43b1d..de51e52fc 100644 --- a/src/session/ClientSession.h +++ b/src/session/ClientSession.h @@ -13,6 +13,8 @@ namespace nebula { namespace graph { +class QueryContext; + constexpr int64_t kInvalidSpaceID = -1; constexpr int64_t kInvalidSessionID = 0; @@ -94,6 +96,11 @@ class ClientSession final { return session_.get_timezone(); } + HostAddr getGraphAddr() { + folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_); + return session_.get_graph_addr(); + } + void setTimezone(int32_t timezone) { { folly::RWSpinLock::WriteHolder wHolder(rwSpinLock_); @@ -123,12 +130,21 @@ class ClientSession final { session_.set_space_name(spaceName); } + void addQuery(QueryContext* qctx); + + void deleteQuery(QueryContext* qctx); + + bool findQuery(nebula::ExecutionPlanID epId); + + void markQueryKilled(nebula::ExecutionPlanID epId); + + void markAllQueryKilled(); + private: ClientSession() = default; explicit ClientSession(meta::cpp2::Session &&session, meta::MetaClient* metaClient); - private: SpaceInfo space_; time::Duration idleDuration_; @@ -141,6 +157,7 @@ class ClientSession final { * But a user has only one role in one space */ std::unordered_map roles_; + std::unordered_map contexts_; }; } // namespace graph diff --git a/src/session/SessionManager.cpp b/src/session/GraphSessionManager.cpp similarity index 58% rename from src/session/SessionManager.cpp rename to src/session/GraphSessionManager.cpp index 9373174c4..2196dd60e 100644 --- a/src/session/SessionManager.cpp +++ b/src/session/GraphSessionManager.cpp @@ -4,35 +4,22 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ +#include "session/GraphSessionManager.h" #include "common/base/Base.h" -#include "session/SessionManager.h" +#include "common/time/WallClock.h" #include "service/GraphFlags.h" namespace nebula { namespace graph { -SessionManager::SessionManager(meta::MetaClient* metaClient, const HostAddr &hostAddr) { - metaClient_ = metaClient; - myAddr_ = hostAddr; - scavenger_ = std::make_unique(); - auto ok = scavenger_->start("session-manager"); - DCHECK(ok); - scavenger_->addDelayTask(FLAGS_session_reclaim_interval_secs * 1000, - &SessionManager::threadFunc, - this); -} - - -SessionManager::~SessionManager() { - if (scavenger_ != nullptr) { - scavenger_->stop(); - scavenger_->wait(); - scavenger_.reset(); - } +GraphSessionManager::GraphSessionManager(meta::MetaClient* metaClient, const HostAddr& hostAddr) + : SessionManager(metaClient, hostAddr) { + scavenger_->addDelayTask( + FLAGS_session_reclaim_interval_secs * 1000, &GraphSessionManager::threadFunc, this); } folly::Future>> -SessionManager::findSession(SessionID id, folly::Executor* runner) { +GraphSessionManager::findSession(SessionID id, folly::Executor* runner) { // When the sessionId is 0, it means the clients to ping the connection is ok if (id == 0) { return folly::makeFuture(Status::Error("SessionId is invalid")).via(runner); @@ -46,8 +33,7 @@ SessionManager::findSession(SessionID id, folly::Executor* runner) { return findSessionFromMetad(id, runner); } -std::shared_ptr SessionManager::findSessionFromCache(SessionID id) { - folly::RWSpinLock::ReadHolder rHolder(rwlock_); +std::shared_ptr GraphSessionManager::findSessionFromCache(SessionID id) { auto iter = activeSessions_.find(id); if (iter == activeSessions_.end()) { return nullptr; @@ -58,7 +44,7 @@ std::shared_ptr SessionManager::findSessionFromCache(SessionID id folly::Future>> -SessionManager::findSessionFromMetad(SessionID id, folly::Executor* runner) { +GraphSessionManager::findSessionFromMetad(SessionID id, folly::Executor* runner) { VLOG(1) << "Find session `" << id << "' from metad"; // local cache not found, need to get from metad auto addSession = [this, id] (auto &&resp) -> StatusOr> { @@ -68,6 +54,7 @@ SessionManager::findSessionFromMetad(SessionID id, folly::Executor* runner) { "Session `%ld' not found: %s", id, resp.status().toString().c_str()); } auto session = resp.value().get_session(); + session.queries_ref()->clear(); auto spaceName = session.get_space_name(); SpaceInfo spaceInfo; if (!spaceName.empty()) { @@ -89,13 +76,16 @@ SessionManager::findSessionFromMetad(SessionID id, folly::Executor* runner) { } { - folly::RWSpinLock::WriteHolder wHolder(rwlock_); auto findPtr = activeSessions_.find(id); if (findPtr == activeSessions_.end()) { VLOG(1) << "Add session id: " << id << " from metad"; + session.set_graph_addr(myAddr_); auto sessionPtr = ClientSession::create(std::move(session), metaClient_); sessionPtr->charge(); - activeSessions_[id] = sessionPtr; + auto ret = activeSessions_.emplace(id, sessionPtr); + if (!ret.second) { + return Status::Error("Insert session to local cache failed."); + } // update the space info to sessionPtr if (!spaceName.empty()) { @@ -114,7 +104,7 @@ SessionManager::findSessionFromMetad(SessionID id, folly::Executor* runner) { } folly::Future>> -SessionManager::createSession(const std::string userName, +GraphSessionManager::createSession(const std::string userName, const std::string clientIp, folly::Executor* runner) { auto createCB = [this, userName = userName] @@ -127,13 +117,15 @@ SessionManager::createSession(const std::string userName, auto sid = session.get_session_id(); DCHECK_NE(sid, 0L); { - folly::RWSpinLock::WriteHolder wHolder(rwlock_); auto findPtr = activeSessions_.find(sid); if (findPtr == activeSessions_.end()) { VLOG(1) << "Create session id: " << sid << ", for user: " << userName; auto sessionPtr = ClientSession::create(std::move(session), metaClient_); sessionPtr->charge(); - activeSessions_[sid] = sessionPtr; + auto ret = activeSessions_.emplace(sid, sessionPtr); + if (!ret.second) { + return Status::Error("Insert session to local cache failed."); + } updateSessionInfo(sessionPtr.get()); return sessionPtr; } @@ -147,13 +139,13 @@ SessionManager::createSession(const std::string userName, .thenValue(createCB); } -void SessionManager::removeSession(SessionID id) { - folly::RWSpinLock::WriteHolder wHolder(rwlock_); +void GraphSessionManager::removeSession(SessionID id) { auto iter = activeSessions_.find(id); if (iter == activeSessions_.end()) { return; } + iter->second->markAllQueryKilled(); auto resp = metaClient_->removeSession(id).get(); if (!resp.ok()) { // it will delete by reclaim @@ -162,21 +154,20 @@ void SessionManager::removeSession(SessionID id) { activeSessions_.erase(iter); } -void SessionManager::threadFunc() { +void GraphSessionManager::threadFunc() { reclaimExpiredSessions(); updateSessionsToMeta(); scavenger_->addDelayTask(FLAGS_session_reclaim_interval_secs * 1000, - &SessionManager::threadFunc, + &GraphSessionManager::threadFunc, this); } // TODO(dutor) Now we do a brute-force scanning, of course we could make it more efficient. -void SessionManager::reclaimExpiredSessions() { +void GraphSessionManager::reclaimExpiredSessions() { if (FLAGS_session_idle_timeout_secs == 0) { return; } - folly::RWSpinLock::WriteHolder wHolder(rwlock_); if (activeSessions_.empty()) { return; } @@ -193,6 +184,7 @@ void SessionManager::reclaimExpiredSessions() { } FLOG_INFO("ClientSession %ld has expired", iter->first); + iter->second->markAllQueryKilled(); auto resp = metaClient_->removeSession(iter->first).get(); if (!resp.ok()) { // TODO: Handle cases where the delete client failed @@ -203,33 +195,97 @@ void SessionManager::reclaimExpiredSessions() { } } -void SessionManager::updateSessionsToMeta() { +void GraphSessionManager::updateSessionsToMeta() { std::vector sessions; { - folly::RWSpinLock::ReadHolder rHolder(rwlock_); if (activeSessions_.empty()) { return; } for (auto &ses : activeSessions_) { - if (ses.second->getSession().get_graph_addr() == myAddr_) { - VLOG(3) << "Add Update session id: " << ses.second->getSession().get_session_id(); - sessions.emplace_back(ses.second->getSession()); + VLOG(3) << "Add Update session id: " << ses.second->getSession().get_session_id(); + auto sessionCopy = ses.second->getSession(); + for (auto& query : *sessionCopy.queries_ref()) { + query.second.set_duration(time::WallClock::fastNowInMicroSec() - + query.second.get_start_time()); } + sessions.emplace_back(std::move(sessionCopy)); } } - auto resp = metaClient_->updateSessions(sessions).get(); - if (!resp.ok()) { - LOG(ERROR) << "Update sessions failed: " << resp.status(); + + auto handleKilledQueries = [this] (auto&& resp) { + if (!resp.ok()) { + LOG(ERROR) << "Update sessions failed: " << resp.status(); + return Status::Error("Update sessions failed: %s", + resp.status().toString().c_str()); + } + auto& killedQueriesForEachSession = *resp.value().killed_queries_ref(); + for (auto& killedQueries : killedQueriesForEachSession) { + auto sessionId = killedQueries.first; + for (auto& desc : killedQueries.second) { + auto session = activeSessions_.find(sessionId); + if (session == activeSessions_.end()) { + continue; + } + if (desc.second.get_graph_addr() != + session->second->getGraphAddr()) { + continue; + } + auto epId = desc.first; + session->second->markQueryKilled(epId); + VLOG(1) + << "Kill query, session: " << sessionId << " plan: " << epId; + } + } + return Status::OK(); + }; + + auto result = metaClient_->updateSessions(sessions) + .thenValue(handleKilledQueries) + .get(); + if (!result.ok()) { + LOG(ERROR) << "Update sessions failed: " << result; } } -void SessionManager::updateSessionInfo(ClientSession* session) { +void GraphSessionManager::updateSessionInfo(ClientSession* session) { session->updateGraphAddr(myAddr_); auto roles = metaClient_->getRolesByUserFromCache(session->user()); for (const auto &role : roles) { session->setRole(role.get_space_id(), role.get_role_type()); } } + +Status GraphSessionManager::init() { + auto listSessionsRet = metaClient_->listSessions().get(); + if (!listSessionsRet.ok()) { + return Status::Error("Load sessions from meta failed."); + } + auto& sessions = *listSessionsRet.value().sessions_ref(); + for (auto& session : sessions) { + if (session.get_graph_addr() != myAddr_) { + continue; + } + auto sessionId = session.get_session_id(); + auto idleSecs = + (time::WallClock::fastNowInMicroSec() - session.get_update_time()) / 1000000; + VLOG(1) << "session_idle_timeout_secs: " << FLAGS_session_idle_timeout_secs + << " idleSecs: " << idleSecs; + if (FLAGS_session_idle_timeout_secs > 0 && idleSecs > FLAGS_session_idle_timeout_secs) { + // remove session if expired + VLOG(1) << "Remove session: " << sessionId; + metaClient_->removeSession(sessionId); + continue; + } + session.queries_ref()->clear(); + auto sessionPtr = ClientSession::create(std::move(session), metaClient_); + auto ret = activeSessions_.emplace(sessionId, sessionPtr); + if (!ret.second) { + return Status::Error("Insert session to local cache failed."); + } + updateSessionInfo(sessionPtr.get()); + } + return Status::OK(); +} } // namespace graph } // namespace nebula diff --git a/src/session/SessionManager.h b/src/session/GraphSessionManager.h similarity index 63% rename from src/session/SessionManager.h rename to src/session/GraphSessionManager.h index 0de170c64..25c506799 100644 --- a/src/session/SessionManager.h +++ b/src/session/GraphSessionManager.h @@ -4,32 +4,32 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -#ifndef SESSION_SESSIONMANAGER_H_ -#define SESSION_SESSIONMANAGER_H_ +#ifndef SESSION_GRAPHSESSIONMANAGER_H_ +#define SESSION_GRAPHSESSIONMANAGER_H_ +#include "common/session/SessionManager.h" #include "common/base/Base.h" #include "common/thrift/ThriftTypes.h" #include "common/base/StatusOr.h" #include "common/thread/GenericWorker.h" #include "common/clients/meta/MetaClient.h" #include "common/interface/gen-cpp2/GraphService.h" - #include "session/ClientSession.h" -#include "service/RequestContext.h" /** - * SessionManager manages the client sessions, e.g. create new, find existing and drop expired. + * GraphSessionManager manages the client sessions, e.g. create new, find existing and drop expired. */ DECLARE_int64(max_allowed_connections); namespace nebula { namespace graph { - -class SessionManager final { +class GraphSessionManager final : public SessionManager { public: - SessionManager(meta::MetaClient* metaClient, const HostAddr &hostAddr); - ~SessionManager(); + GraphSessionManager(meta::MetaClient* metaClient, const HostAddr &hostAddr); + ~GraphSessionManager() {} + + Status init(); /** * Create a new session @@ -37,10 +37,9 @@ class SessionManager final { folly::Future>> createSession(const std::string userName, const std::string clientIp, - folly::Executor* runner); + folly::Executor* runner) override; bool isOutOfConnections() { - folly::RWSpinLock::ReadHolder rHolder(rwlock_); if (activeSessions_.size() >= static_cast(FLAGS_max_allowed_connections)) { LOG(INFO) << "The sessions of the cluster has more than max_allowed_connections: " << FLAGS_max_allowed_connections; @@ -52,17 +51,17 @@ class SessionManager final { /** * Remove a session */ - void removeSession(SessionID id); + void removeSession(SessionID id) override; folly::Future>> - findSession(SessionID id, folly::Executor* runner); + findSession(SessionID id, folly::Executor* runner) override; -private: /** * Find an existing session */ std::shared_ptr findSessionFromCache(SessionID id); +private: folly::Future>> findSessionFromMetad(SessionID id, folly::Executor* runner); @@ -73,18 +72,10 @@ class SessionManager final { void updateSessionsToMeta(); void updateSessionInfo(ClientSession* session); - -private: - using SessionPtr = std::shared_ptr; - folly::RWSpinLock rwlock_; // TODO(dutor) writer might starve - std::unordered_map activeSessions_; - std::unique_ptr scavenger_; - meta::MetaClient *metaClient_{nullptr}; - HostAddr myAddr_; }; } // namespace graph } // namespace nebula -#endif // SESSION_SESSIONMANAGER_H_ +#endif // SESSION_GRAPHSESSIONMANAGER_H_ diff --git a/src/util/ExpressionUtils.cpp b/src/util/ExpressionUtils.cpp index d0557ea52..6bdae94d9 100644 --- a/src/util/ExpressionUtils.cpp +++ b/src/util/ExpressionUtils.cpp @@ -775,6 +775,5 @@ Expression *ExpressionUtils::equalCondition(ObjectPool *pool, return RelationalExpression::makeEQ( pool, VariableExpression::make(pool, var), ConstantExpression::make(pool, value)); } - } // namespace graph } // namespace nebula diff --git a/src/util/test/CMakeLists.txt b/src/util/test/CMakeLists.txt index 92ff45227..894ed01a1 100644 --- a/src/util/test/CMakeLists.txt +++ b/src/util/test/CMakeLists.txt @@ -34,7 +34,7 @@ nebula_add_test( $ $ $ - $ + $ $ $ $ diff --git a/src/validator/AdminValidator.cpp b/src/validator/AdminValidator.cpp index ee4b77d38..e972fa5b8 100644 --- a/src/validator/AdminValidator.cpp +++ b/src/validator/AdminValidator.cpp @@ -15,6 +15,7 @@ #include "planner/plan/Admin.h" #include "planner/plan/Query.h" #include "service/GraphFlags.h" +#include "util/ExpressionUtils.h" #include "util/SchemaUtil.h" namespace nebula { @@ -521,5 +522,63 @@ Status ShowSessionsValidator::toPlan() { return Status::OK(); } +Status ShowQueriesValidator::validateImpl() { + if (!inputs_.empty()) { + return Status::SemanticError("Show queries sentence do not support input"); + } + outputs_.emplace_back("SessionID", Value::Type::INT); + outputs_.emplace_back("ExecutionPlanID", Value::Type::INT); + outputs_.emplace_back("User", Value::Type::STRING); + outputs_.emplace_back("Host", Value::Type::STRING); + outputs_.emplace_back("StartTime", Value::Type::DATETIME); + outputs_.emplace_back("DurationInUSec", Value::Type::INT); + outputs_.emplace_back("Status", Value::Type::STRING); + outputs_.emplace_back("Query", Value::Type::STRING); + return Status::OK(); +} + +Status ShowQueriesValidator::toPlan() { + auto sentence = static_cast(sentence_); + auto *node = ShowQueries::make(qctx_, nullptr, sentence->isAll()); + root_ = node; + tail_ = root_; + return Status::OK(); +} + +Status KillQueryValidator::validateImpl() { + auto sentence = static_cast(sentence_); + auto *sessionExpr = sentence->sessionId(); + auto *epIdExpr = sentence->epId(); + auto sessionTypeStatus = deduceExprType(sessionExpr); + if (!sessionTypeStatus.ok()) { + return sessionTypeStatus.status(); + } + if (sessionTypeStatus.value() != Value::Type::INT) { + std::stringstream ss; + ss << sessionExpr->toString() << ", Session ID must be an integer but was " + << sessionTypeStatus.value(); + return Status::SemanticError(ss.str()); + } + auto epIdStatus = deduceExprType(epIdExpr); + if (!epIdStatus.ok()) { + return epIdStatus.status(); + } + if (epIdStatus.value() != Value::Type::INT) { + std::stringstream ss; + ss << epIdExpr->toString() << ", Session ID must be an integer but was " + << epIdStatus.value(); + return Status::SemanticError(ss.str()); + } + + return Status::OK(); +} + +Status KillQueryValidator::toPlan() { + auto sentence = static_cast(sentence_); + auto *node = KillQuery::make(qctx_, nullptr, sentence->sessionId(), sentence->epId()); + root_ = node; + tail_ = root_; + return Status::OK(); +} } // namespace graph } // namespace nebula diff --git a/src/validator/AdminValidator.h b/src/validator/AdminValidator.h index 8c783b66c..1558f598e 100644 --- a/src/validator/AdminValidator.h +++ b/src/validator/AdminValidator.h @@ -346,6 +346,32 @@ class GetSessionValidator final : public Validator { private: SessionID sessionId_{0}; }; + +class ShowQueriesValidator final : public Validator { +public: + ShowQueriesValidator(Sentence* sentence, QueryContext* context) + : Validator(sentence, context) { + setNoSpaceRequired(); + } + +private: + Status validateImpl() override; + + Status toPlan() override; +}; + +class KillQueryValidator final : public Validator { +public: + KillQueryValidator(Sentence* sentence, QueryContext* context) + : Validator(sentence, context) { + setNoSpaceRequired(); + } + +private: + Status validateImpl() override; + + Status toPlan() override; +}; } // namespace graph } // namespace nebula #endif // VALIDATOR_ADMINVALIDATOR_H_ diff --git a/src/validator/GroupByValidator.h b/src/validator/GroupByValidator.h index 656740ec8..1b93bf242 100644 --- a/src/validator/GroupByValidator.h +++ b/src/validator/GroupByValidator.h @@ -16,7 +16,8 @@ namespace graph { class GroupByValidator final : public Validator { public: - GroupByValidator(Sentence *sentence, QueryContext *context) : Validator(sentence, context) {} + GroupByValidator(Sentence *sentence, QueryContext *context) + : Validator(sentence, context) {} Status validateImpl() override; diff --git a/src/validator/LimitValidator.h b/src/validator/LimitValidator.h index eff70cfcb..d24cd7ee2 100644 --- a/src/validator/LimitValidator.h +++ b/src/validator/LimitValidator.h @@ -14,7 +14,9 @@ namespace graph { class LimitValidator final : public Validator { public: LimitValidator(Sentence* sentence, QueryContext* context) - : Validator(sentence, context) {} + : Validator(sentence, context) { + setNoSpaceRequired(); + } private: Status validateImpl() override; diff --git a/src/validator/OrderByValidator.h b/src/validator/OrderByValidator.h index fefbf4c2e..4f28c7017 100644 --- a/src/validator/OrderByValidator.h +++ b/src/validator/OrderByValidator.h @@ -14,7 +14,9 @@ namespace graph { class OrderByValidator final : public Validator { public: OrderByValidator(Sentence* sentence, QueryContext* context) - : Validator(sentence, context) {} + : Validator(sentence, context) { + setNoSpaceRequired(); + } private: Status validateImpl() override; diff --git a/src/validator/Validator.cpp b/src/validator/Validator.cpp index 29e7a9bd3..0b49b5b10 100644 --- a/src/validator/Validator.cpp +++ b/src/validator/Validator.cpp @@ -249,6 +249,10 @@ std::unique_ptr Validator::makeValidator(Sentence* sentence, QueryCon case Sentence::Kind::kShowZones: case Sentence::Kind::kShowSessions: return std::make_unique(sentence, context); + case Sentence::Kind::kShowQueries: + return std::make_unique(sentence, context); + case Sentence::Kind::kKillQuery: + return std::make_unique(sentence, context); case Sentence::Kind::kUnknown: case Sentence::Kind::kReturn: { // nothing diff --git a/src/validator/test/CMakeLists.txt b/src/validator/test/CMakeLists.txt index 3b9473555..c6ae1b674 100644 --- a/src/validator/test/CMakeLists.txt +++ b/src/validator/test/CMakeLists.txt @@ -15,12 +15,12 @@ set(VALIDATOR_TEST_LIBS $ $ $ - $ $ $ $ $ $ + $ $ $ $ diff --git a/src/visitor/test/CMakeLists.txt b/src/visitor/test/CMakeLists.txt index 59838a5a6..a89edb54e 100644 --- a/src/visitor/test/CMakeLists.txt +++ b/src/visitor/test/CMakeLists.txt @@ -20,7 +20,7 @@ nebula_add_test( $ $ $ - $ + $ $ $ $ diff --git a/tests/Makefile b/tests/Makefile index a56af890a..350db9123 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -57,11 +57,11 @@ test: && python3 -m pytest --rm_dir=$(RM_DIR) -m "not skip" -k "not tck" job/test_session.py tck: - cd $(CURR_DIR) && python3 -m pytest --cucumber-json=$(CURR_DIR)/tck-report.json --cucumber-json-expanded -n$(J) --rm_dir=$(RM_DIR) -m "not skip" $(CURR_DIR)/tck/ + cd $(CURR_DIR) && python3 -m pytest --cucumber-json=$(CURR_DIR)/tck-report.json --cucumber-json-expanded -n$(J) --rm_dir=$(RM_DIR) -m "not skip" $(CURR_DIR)/tck/steps/test_tck.py && python3 -m pytest --cucumber-json=$(CURR_DIR)/tck-report.json --cucumber-json-expanded -n$(J) --rm_dir=$(RM_DIR) -m "not skip" $(CURR_DIR)/tck/steps/test_kill_slow_query_via_same_service.py && python3 -m pytest --cucumber-json=$(CURR_DIR)/tck-report.json --cucumber-json-expanded -n$(J) --rm_dir=$(RM_DIR) -m "not skip" $(CURR_DIR)/tck/steps/test_kill_slow_query_via_different_service.py -fail: - cd $(CURR_DIR) && python3 -m pytest --last-failed --gherkin-terminal-reporter --gherkin-terminal-reporter-expanded --rm_dir=$(RM_DIR) -m "not skip" $(CURR_DIR)/tck/ +fail: + cd $(CURR_DIR) && python3 -m pytest --last-failed --gherkin-terminal-reporter --gherkin-terminal-reporter-expanded --rm_dir=$(RM_DIR) -m "not skip" $(CURR_DIR)/tck/steps/test_tck.py report: @mv $(CURR_DIR)/tck-report.json $(CURR_DIR)/tck-report-bak.json @jq . $(CURR_DIR)/tck-report-bak.json > tck-report.json diff --git a/tests/common/nebula_service.py b/tests/common/nebula_service.py index 1a9776c6b..64f326ade 100644 --- a/tests/common/nebula_service.py +++ b/tests/common/nebula_service.py @@ -205,7 +205,7 @@ def start(self, debug_log=True, multi_graphd=False): self._collect_pids() - return graph_ports[0] + return graph_ports def _collect_pids(self): for pf in glob.glob(self.work_dir + '/pids/*.pid'): diff --git a/tests/conftest.py b/tests/conftest.py index 288b17e14..d1bb4c235 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -71,6 +71,14 @@ def pytest_configure(config): def get_port(): + with open(NB_TMP_PATH, "r") as f: + data = json.loads(f.readline()) + port = data.get("port", None) + if port is None: + raise Exception(f"Invalid port: {port}") + return port[0] + +def get_ports(): with open(NB_TMP_PATH, "r") as f: data = json.loads(f.readline()) port = data.get("port", None) @@ -78,25 +86,47 @@ def get_port(): raise Exception(f"Invalid port: {port}") return port +@pytest.fixture(scope="session") +def conn_pool_to_first_graph_service(pytestconfig): + addr = pytestconfig.getoption("address") + host_addr = addr.split(":") if addr else ["localhost", get_ports()[0]] + assert len(host_addr) == 2 + pool = get_conn_pool(host_addr[0], host_addr[1]) + yield pool + pool.close() @pytest.fixture(scope="session") -def conn_pool(pytestconfig): +def conn_pool_to_second_graph_service(pytestconfig): addr = pytestconfig.getoption("address") - host_addr = addr.split(":") if addr else ["localhost", get_port()] + host_addr = ["localhost", get_ports()[1]] assert len(host_addr) == 2 pool = get_conn_pool(host_addr[0], host_addr[1]) yield pool pool.close() +@pytest.fixture(scope="session") +def conn_pool(conn_pool_to_first_graph_service): + return conn_pool_to_first_graph_service + +@pytest.fixture(scope="class") +def session_from_first_conn_pool(conn_pool_to_first_graph_service, pytestconfig): + user = pytestconfig.getoption("user") + password = pytestconfig.getoption("password") + sess = conn_pool_to_first_graph_service.get_session(user, password) + yield sess + sess.release() @pytest.fixture(scope="class") -def session(conn_pool, pytestconfig): +def session_from_second_conn_pool(conn_pool_to_second_graph_service, pytestconfig): user = pytestconfig.getoption("user") password = pytestconfig.getoption("password") - sess = conn_pool.get_session(user, password) + sess = conn_pool_to_second_graph_service.get_session(user, password) yield sess sess.release() +@pytest.fixture(scope="class") +def session(session_from_first_conn_pool): + return session_from_first_conn_pool def load_csv_data_once(space: str): with open(SPACE_TMP_PATH, "r") as f: diff --git a/tests/job/test_session.py b/tests/job/test_session.py index 52c95f5e8..2dbaa8365 100644 --- a/tests/job/test_session.py +++ b/tests/job/test_session.py @@ -151,7 +151,13 @@ def get_connection(ip, port): resp = conn1.execute(session_id, 'CREATE SPACE IF NOT EXISTS aSpace(partition_num=1, vid_type=FIXED_STRING(8));USE aSpace;') self.check_resp_succeeded(ResultSet(resp, 0)) + # time::WallClock::fastNowInMicroSec() is not syncronous in different process, + # so we sleep 3 seconds here and charge session time.sleep(3) + resp = conn1.execute(session_id, 'USE aSpace;') + self.check_resp_succeeded(ResultSet(resp, 0)) + time.sleep(3) + # We actually not allowed share sessions, this only for testing the scenario of transfer sessions. resp = conn1.execute(session_id, 'CREATE TAG IF NOT EXISTS a();') self.check_resp_succeeded(ResultSet(resp, 0)) resp = conn2.execute(session_id, 'CREATE TAG IF NOT EXISTS b();') diff --git a/tests/nebula-test-run.py b/tests/nebula-test-run.py index 826f9b288..d01e0c642 100755 --- a/tests/nebula-test-run.py +++ b/tests/nebula-test-run.py @@ -57,14 +57,14 @@ def start_nebula(nb, configs): if len(configs.address.split(':')) != 2: raise Exception('Invalid address, address is {}'.format(configs.address)) address, port = configs.address.split(':') - port = int(port) + ports = [int(port)] else: nb.install() address = "localhost" - port = nb.start(multi_graphd=configs.multi_graphd) + ports = nb.start(multi_graphd=configs.multi_graphd) # Load csv data - pool = get_conn_pool(address, port) + pool = get_conn_pool("localhost", ports[0]) sess = pool.get_session(configs.user, configs.password) if not os.path.exists(TMP_DIR): @@ -81,7 +81,7 @@ def start_nebula(nb, configs): with open(NB_TMP_PATH, "w") as f: data = { "ip": "localhost", - "port": port, + "port": ports, "work_dir": nb.work_dir } f.write(json.dumps(data)) @@ -92,7 +92,7 @@ def stop_nebula(nb, configs=None): if configs.address is not None and configs.address != "": print('test remote nebula graph, no need to stop nebula.') return - + with open(NB_TMP_PATH, "r") as f: data = json.loads(f.readline()) nb.set_work_dir(data["work_dir"]) diff --git a/tests/tck/conftest.py b/tests/tck/conftest.py index 230c61e86..0c51ebc69 100644 --- a/tests/tck/conftest.py +++ b/tests/tck/conftest.py @@ -456,3 +456,12 @@ def check_plan(plan, graph_spaces): rows[i] = row differ = PlanDiffer(resp.plan_desc(), expect) assert differ.diff(), differ.err_msg() + +@when(parse("executing query via graph {index:d}:\n{query}")) +def executing_query(query, index, graph_spaces, session_from_first_conn_pool, session_from_second_conn_pool, request): + assert index < 2, "There exists only 0,1 graph: {}".format(index) + ngql = combine_query(query) + if index == 0: + exec_query(request, ngql, session_from_first_conn_pool, graph_spaces) + else: + exec_query(request, ngql, session_from_second_conn_pool, graph_spaces) diff --git a/tests/tck/slowquery/KillSlowQueryViaDiffrentService.feature b/tests/tck/slowquery/KillSlowQueryViaDiffrentService.feature new file mode 100644 index 000000000..94924e72a --- /dev/null +++ b/tests/tck/slowquery/KillSlowQueryViaDiffrentService.feature @@ -0,0 +1,91 @@ +# Copyright (c) 2021 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License, +# attached with Common Clause Condition 1.0, found in the LICENSES directory. +Feature: Slow Query Test + + # There should be a least 2 thread to run this test case suite. + Scenario: Set up slow query at first graph service + # Set up a slow query which will be killed later. + Given a graph with space named "nba" + When executing query via graph 0: + """ + GO 100000 STEPS FROM "Tim Duncan" OVER like + """ + Then an ExecutionError should be raised at runtime: Execution had been killed + + Scenario: Show all queries and kill all slow queries at second graph service + When executing query via graph 1: + """ + SHOW QUERIES + """ + Then the execution should be successful + When executing query via graph 1: + """ + SHOW ALL QUERIES + """ + Then the execution should be successful + And wait 20 seconds + When executing query via graph 1: + """ + SHOW ALL QUERIES + """ + Then the result should be, in order: + | SessionID | ExecutionPlanID | User | Host | StartTime | DurationInUSec | Status | Query | + | /\d+/ | /\d+/ | "root" | /.*/ | /.*/ | /\d+/ | "RUNNING" | "GO 100000 STEPS FROM \"Tim Duncan\" OVER like" | + When executing query via graph 1: + """ + SHOW ALL QUERIES + | YIELD $-.SessionID AS sid, $-.ExecutionPlanID AS eid, $-.DurationInUSec AS dur + WHERE $-.DurationInUSec > 1000000 AND $-.`Query` CONTAINS "GO 100000 STEPS"; + """ + Then the result should be, in order: + | sid | eid | dur | + | /\d+/ | /\d+/ | /\d+/ | + When executing query via graph 1: + """ + KILL QUERY () + """ + Then a SyntaxError should be raised at runtime: syntax error near `)' + When executing query via graph 1: + """ + KILL QUERY (session=123) + """ + Then a SyntaxError should be raised at runtime: syntax error near `)' + When executing query via graph 1: + """ + KILL QUERY (plan=987654321) + """ + Then an ExecutionError should be raised at runtime: ExecutionPlanId[987654321] does not exist in current Session. + When executing query via graph 1: + """ + KILL QUERY (session=987654321, plan=987654321) + """ + Then an ExecutionError should be raised at runtime: SessionId[987654321] does not exist + When executing query via graph 1: + """ + KILL QUERY (session=$-.sid, plan=$-.eid) + """ + Then an SemanticError should be raised at runtime: `$-.sid', not exist prop `sid' + When executing query via graph 1: + """ + KILL QUERY (plan=$-.eid) + """ + Then an SemanticError should be raised at runtime: `$-.eid', not exist prop `eid' + When executing query via graph 1: + """ + SHOW ALL QUERIES + | YIELD $-.SessionID AS sid, $-.`Query` AS eid, $-.DurationInUSec AS dur WHERE $-.DurationInUSec > 10000000 + | ORDER BY $-.dur + | KILL QUERY (session=$-.sid, plan=$-.eid) + """ + Then an SemanticError should be raised at runtime: $-.eid, Session ID must be an integer but was STRING + When executing query via graph 1: + """ + SHOW ALL QUERIES + | YIELD $-.SessionID AS sid, $-.ExecutionPlanID AS eid, $-.DurationInUSec AS dur + WHERE $-.DurationInUSec > 1000000 AND $-.`Query` CONTAINS "GO" + | ORDER BY $-.dur + | KILL QUERY(session=$-.sid, plan=$-.eid) + """ + Then the execution should be successful diff --git a/tests/tck/slowquery/KillSlowQueryViaSameService.feature b/tests/tck/slowquery/KillSlowQueryViaSameService.feature new file mode 100644 index 000000000..e0140e363 --- /dev/null +++ b/tests/tck/slowquery/KillSlowQueryViaSameService.feature @@ -0,0 +1,91 @@ +# Copyright (c) 2021 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License, +# attached with Common Clause Condition 1.0, found in the LICENSES directory. +Feature: Slow Query Test + + # There should be a least 2 thread to run this test case suite. + Scenario: Set up slow query + # Set up a slow query which will be killed later. + Given a graph with space named "nba" + When executing query: + """ + GO 100000 STEPS FROM "Tim Duncan" OVER like + """ + Then an ExecutionError should be raised at runtime: Execution had been killed + + Scenario: Show all queries and kill all slow queries + When executing query: + """ + SHOW QUERIES + """ + Then the execution should be successful + When executing query: + """ + SHOW ALL QUERIES + """ + Then the execution should be successful + And wait 20 seconds + When executing query: + """ + SHOW ALL QUERIES + """ + Then the result should be, in order: + | SessionID | ExecutionPlanID | User | Host | StartTime | DurationInUSec | Status | Query | + | /\d+/ | /\d+/ | "root" | /.*/ | /.*/ | /\d+/ | "RUNNING" | "GO 100000 STEPS FROM \"Tim Duncan\" OVER like" | + When executing query: + """ + SHOW ALL QUERIES + | YIELD $-.SessionID AS sid, $-.ExecutionPlanID AS eid, $-.DurationInUSec AS dur + WHERE $-.DurationInUSec > 1000 AND $-.`Query` CONTAINS "GO 100000 STEPS"; + """ + Then the result should be, in order: + | sid | eid | dur | + | /\d+/ | /\d+/ | /\d+/ | + When executing query: + """ + KILL QUERY () + """ + Then a SyntaxError should be raised at runtime: syntax error near `)' + When executing query: + """ + KILL QUERY (session=123) + """ + Then a SyntaxError should be raised at runtime: syntax error near `)' + When executing query: + """ + KILL QUERY (plan=987654321) + """ + Then an ExecutionError should be raised at runtime: ExecutionPlanId[987654321] does not exist in current Session. + When executing query: + """ + KILL QUERY (session=987654321, plan=987654321) + """ + Then an ExecutionError should be raised at runtime: SessionId[987654321] does not exist + When executing query: + """ + KILL QUERY (session=$-.sid, plan=$-.eid) + """ + Then an SemanticError should be raised at runtime: `$-.sid', not exist prop `sid' + When executing query: + """ + KILL QUERY (plan=$-.eid) + """ + Then an SemanticError should be raised at runtime: `$-.eid', not exist prop `eid' + When executing query: + """ + SHOW ALL QUERIES + | YIELD $-.SessionID AS sid, $-.`Query` AS eid, $-.DurationInUSec AS dur WHERE $-.DurationInUSec > 10000000 + | ORDER BY $-.dur + | KILL QUERY (session=$-.sid, plan=$-.eid) + """ + Then an SemanticError should be raised at runtime: $-.eid, Session ID must be an integer but was STRING + When executing query: + """ + SHOW ALL QUERIES + | YIELD $-.SessionID AS sid, $-.ExecutionPlanID AS eid, $-.DurationInUSec AS dur + WHERE $-.DurationInUSec > 10000000 AND $-.`Query` CONTAINS "GO" + | ORDER BY $-.dur + | KILL QUERY(session=$-.sid, plan=$-.eid) + """ + Then the execution should be successful diff --git a/tests/tck/steps/test_kill_slow_query_via_different_service.py b/tests/tck/steps/test_kill_slow_query_via_different_service.py new file mode 100644 index 000000000..2a94ae4e6 --- /dev/null +++ b/tests/tck/steps/test_kill_slow_query_via_different_service.py @@ -0,0 +1,9 @@ +# Copyright (c) 2021 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License, +# attached with Common Clause Condition 1.0, found in the LICENSES directory. + +from pytest_bdd import scenarios + + +scenarios('slowquery/KillSlowQueryViaDiffrentService.feature') diff --git a/tests/tck/steps/test_kill_slow_query_via_same_service.py b/tests/tck/steps/test_kill_slow_query_via_same_service.py new file mode 100644 index 000000000..50cceaec5 --- /dev/null +++ b/tests/tck/steps/test_kill_slow_query_via_same_service.py @@ -0,0 +1,9 @@ +# Copyright (c) 2021 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License, +# attached with Common Clause Condition 1.0, found in the LICENSES directory. + +from pytest_bdd import scenarios + + +scenarios('slowquery/KillSlowQueryViaSameService.feature')