Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Slow query. (#1152)
Browse files Browse the repository at this point in the history
* Add graph session manager.

* Fix usage of concurrent map.

* Use emplace.

* Save the queries to session.

* Kill query when execute.

* Update the killed query from meta.

* Add show and kill query sentence.

* Add validator and node.

* Support kill query.

* Kill the query if session is destroyed and update addr to session if get from meta.

* Fix update the killed queries from meta and delete query if execute error.

* Load all unexpired sessions from meta, remove all expired sessions, remove all queries saved in sessions when start graph.

* Add toString and test.

* Mark query killed when session sign out or expired.

* Add explain.

* Add test and fix top N.

* Make show queries as a traverse sentence.

* Support kill from pipe.

* Check expr type when kill query.

* Add tck test.

* Test slow query seperately.

* Fix group by.

* Fix kill current session query.

* Remove top n.

* Rebase and fix.

* Check sessions by listing sessions.

* Address comment.

* Address comment.

* Rebase and fix.

* Add more test.

* Address comment.

* Format.

* Charge session later.

* Fix test.
  • Loading branch information
CPWstatic committed Jul 2, 2021
1 parent bdb0683 commit 669694c
Show file tree
Hide file tree
Showing 55 changed files with 1,284 additions and 109 deletions.
10 changes: 10 additions & 0 deletions src/context/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ class QueryContext {
rctx_->resp().errorCode = ErrorCode::E_PARTIAL_SUCCEEDED;
}

void markKilled() {
killed_.exchange(true);
}

bool isKilled() const {
return killed_.load();
}

private:
void init();

Expand All @@ -147,6 +155,8 @@ class QueryContext {
std::unique_ptr<ObjectPool> objPool_;
std::unique_ptr<IdGenerator> idGen_;
std::unique_ptr<SymbolTable> symTable_;

std::atomic<bool> killed_{false};
};

} // namespace graph
Expand Down
2 changes: 1 addition & 1 deletion src/context/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ SET(CONTEXT_TEST_LIBS
$<TARGET_OBJECTS:validator_obj>
$<TARGET_OBJECTS:graph_flags_obj>
$<TARGET_OBJECTS:graph_auth_obj>
$<TARGET_OBJECTS:session_obj>
$<TARGET_OBJECTS:graph_session_obj>
$<TARGET_OBJECTS:planner_obj>
$<TARGET_OBJECTS:idgenerator_obj>
)
Expand Down
2 changes: 1 addition & 1 deletion src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:common_version_obj>
$<TARGET_OBJECTS:util_obj>
$<TARGET_OBJECTS:service_obj>
$<TARGET_OBJECTS:session_obj>
$<TARGET_OBJECTS:graph_session_obj>
$<TARGET_OBJECTS:query_engine_obj>
$<TARGET_OBJECTS:parser_obj>
$<TARGET_OBJECTS:validator_obj>
Expand Down
2 changes: 2 additions & 0 deletions src/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ nebula_add_library(
admin/SignInTSServiceExecutor.cpp
admin/SignOutTSServiceExecutor.cpp
admin/SessionExecutor.cpp
admin/ShowQueriesExecutor.cpp
admin/KillQueryExecutor.cpp
maintain/TagExecutor.cpp
maintain/TagIndexExecutor.cpp
maintain/EdgeExecutor.cpp
Expand Down
16 changes: 15 additions & 1 deletion src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
#include "executor/admin/SwitchSpaceExecutor.h"
#include "executor/admin/UpdateUserExecutor.h"
#include "executor/admin/ZoneExecutor.h"
#include "executor/admin/ShowQueriesExecutor.h"
#include "executor/admin/KillQueryExecutor.h"
#include "executor/algo/BFSShortestPathExecutor.h"
#include "executor/algo/CartesianProductExecutor.h"
#include "executor/algo/ConjunctPathExecutor.h"
Expand Down Expand Up @@ -497,6 +499,12 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kUpdateSession: {
return pool->add(new UpdateSessionExecutor(node, qctx));
}
case PlanNode::Kind::kShowQueries: {
return pool->add(new ShowQueriesExecutor(node, qctx));
}
case PlanNode::Kind::kKillQuery: {
return pool->add(new KillQueryExecutor(node, qctx));
}
case PlanNode::Kind::kUnknown: {
LOG(FATAL) << "Unknown plan node kind " << static_cast<int32_t>(node->kind());
break;
Expand All @@ -521,6 +529,12 @@ Executor::Executor(const std::string &name, const PlanNode *node, QueryContext *
Executor::~Executor() {}

Status Executor::open() {
if (qctx_->isKilled()) {
VLOG(1) << "Execution is being killed. session: " << qctx()->rctx()->session()->id()
<< "ep: " << qctx()->plan()->id()
<< "query: " << qctx()->rctx()->query();
return Status::Error("Execution had been killed");
}
auto status = MemInfo::make();
NG_RETURN_IF_ERROR(status);
auto mem = std::move(status).value();
Expand Down Expand Up @@ -555,7 +569,7 @@ folly::Future<Status> Executor::start(Status status) const {
}

folly::Future<Status> Executor::error(Status status) const {
return folly::makeFuture<Status>(ExecutionError(std::move(status))).via(runner());
return folly::makeFuture<Status>(std::move(status)).via(runner());
}

void Executor::drop() {
Expand Down
145 changes: 145 additions & 0 deletions src/executor/admin/KillQueryExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "executor/admin/KillQueryExecutor.h"

#include "context/QueryContext.h"
#include "planner/plan/Admin.h"

namespace nebula {
namespace graph {
folly::Future<Status> KillQueryExecutor::execute() {
SCOPED_TIMER(&execTime_);

// TODO: permision check

QueriesMap toBeVerifiedQueries;
QueriesMap killQueries;
NG_RETURN_IF_ERROR(verifyTheQueriesByLocalCache(toBeVerifiedQueries, killQueries));

return qctx()
->getMetaClient()
->listSessions()
.via(runner())
.thenValue([toBeVerifiedQueries = std::move(toBeVerifiedQueries),
killQueries = std::move(killQueries),
this](StatusOr<meta::cpp2::ListSessionsResp> listResp) mutable {
std::vector<meta::cpp2::Session> sessionsInMeta;
if (listResp.ok()) {
sessionsInMeta = std::move(listResp.value()).get_sessions();
} else {
LOG(ERROR) << listResp.status();
}

auto status = verifyTheQueriesByMetaInfo(toBeVerifiedQueries, sessionsInMeta);
if (!status.ok()) {
return folly::makeFuture<StatusOr<meta::cpp2::ExecResp>>(status);
}

killCurrentHostQueries(killQueries);

// upload all queries to be killed to meta.
return qctx()->getMetaClient()->killQuery(std::move(killQueries));
})
.thenValue([](auto&& resp) {
if (!resp.ok()) {
return resp.status();
}
return Status::OK();
});
}

Status KillQueryExecutor::verifyTheQueriesByLocalCache(QueriesMap& toBeVerifiedQueries,
QueriesMap& killQueries) {
auto* killQuery = asNode<KillQuery>(node());
auto inputVar = killQuery->inputVar();
auto iter = ectx_->getResult(inputVar).iter();
DCHECK(!!iter);
QueryExpressionContext ctx(ectx_);
auto sessionExpr = killQuery->sessionId();
auto epExpr = killQuery->epId();

auto* session = qctx()->rctx()->session();
auto* sessionMgr = qctx_->rctx()->sessionMgr();
for (; iter->valid(); iter->next()) {
auto& sessionVal = sessionExpr->eval(ctx(iter.get()));
if (!sessionVal.isInt()) {
std::stringstream ss;
ss << "Session `" << sessionExpr->toString() << "' is not kind of"
<< " int, but was " << sessionVal.type();
return Status::Error(ss.str());
}
auto& epVal = epExpr->eval(ctx(iter.get()));
if (!epVal.isInt()) {
std::stringstream ss;
ss << "ExecutionPlanID `" << epExpr->toString() << "' is not kind of"
<< " int, but was " << epVal.type();
return Status::Error(ss.str());
}

auto sessionId = sessionVal.getInt();
auto epId = epVal.getInt();
if (sessionId == session->id() || sessionId < 0) {
if (!session->findQuery(epId)) {
return Status::Error("ExecutionPlanId[%ld] does not exist in current Session.",
epId);
}
killQueries[session->id()].emplace(epId);
} else {
auto sessionPtr = sessionMgr->findSessionFromCache(sessionId);
if (sessionPtr == nullptr) {
toBeVerifiedQueries[sessionId].emplace(epId);
} else if (!sessionPtr->findQuery(epId)) {
return Status::Error("ExecutionPlanId[%ld] does not exist in Session[%ld].",
epId, sessionId);
}
killQueries[sessionId].emplace(epId);
}
}
return Status::OK();
}

void KillQueryExecutor::killCurrentHostQueries(const QueriesMap& killQueries) {
auto* session = qctx()->rctx()->session();
auto* sessionMgr = qctx_->rctx()->sessionMgr();
for (auto& s : killQueries) {
auto sessionId = s.first;
for (auto& epId : s.second) {
if (sessionId == session->id()) {
session->markQueryKilled(epId);
} else {
auto sessionPtr = sessionMgr->findSessionFromCache(sessionId);
if (sessionPtr != nullptr) {
sessionPtr->markQueryKilled(epId);
}
}
}
}
}

Status KillQueryExecutor::verifyTheQueriesByMetaInfo(
const QueriesMap& toBeVerifiedQueries,
const std::vector<meta::cpp2::Session>& sessionsInMeta) {
for (auto& s : toBeVerifiedQueries) {
auto sessionId = s.first;
auto found =
std::find_if(sessionsInMeta.begin(), sessionsInMeta.end(), [sessionId](auto& val) {
return val.get_session_id() == sessionId;
});
if (found == sessionsInMeta.end()) {
return Status::Error("SessionId[%ld] does not exist", sessionId);
}
for (auto& epId : s.second) {
if (found->get_queries().find(epId) == found->get_queries().end()) {
return Status::Error(
"ExecutionPlanId[%ld] does not exist in Session[%ld].", epId, sessionId);
}
}
}
return Status::OK();
}
} // namespace graph
} // namespace nebula
34 changes: 34 additions & 0 deletions src/executor/admin/KillQueryExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef EXECUTOR_ADMIN_KILLQUERYEXECUTOR_H_
#define EXECUTOR_ADMIN_KILLQUERYEXECUTOR_H_

#include "executor/Executor.h"

namespace nebula {
namespace graph {
class KillQueryExecutor final : public Executor {
public:
KillQueryExecutor(const PlanNode *node, QueryContext *qctx)
: Executor("KillQueryExecutor", node, qctx) {}

folly::Future<Status> execute() override;

private:
using QueriesMap = std::unordered_map<SessionID, std::unordered_set<ExecutionPlanID>>;
Status verifyTheQueriesByLocalCache(QueriesMap& toBeVerifiedQueries,
QueriesMap& killQueries);

Status verifyTheQueriesByMetaInfo(const QueriesMap& toBeVerifiedQueries,
const std::vector<meta::cpp2::Session>& sessionsInMeta);

void killCurrentHostQueries(const QueriesMap& killQueries);
};
} // namespace graph
} // namespace nebula

#endif // EXECUTOR_ADMIN_KILLQUERYEXECUTOR_H_
99 changes: 99 additions & 0 deletions src/executor/admin/ShowQueriesExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include <thrift/lib/cpp/util/EnumUtils.h>

#include "executor/admin/ShowQueriesExecutor.h"

#include "common/time/TimeUtils.h"
#include "context/QueryContext.h"
#include "planner/plan/Admin.h"
#include "util/ScopedTimer.h"

namespace nebula {
namespace graph {
folly::Future<Status> ShowQueriesExecutor::execute() {
SCOPED_TIMER(&execTime_);

auto* showQueries = asNode<ShowQueries>(node());
auto isAll = showQueries->isAll();

if (!isAll) {
return showCurrentSessionQueries();
} else {
return showAllSessionQueries();
}
}

folly::Future<Status> ShowQueriesExecutor::showCurrentSessionQueries() {
DataSet dataSet({"SessionID",
"ExecutionPlanID",
"User",
"Host",
"StartTime",
"DurationInUSec",
"Status",
"Query"});
auto* session = qctx()->rctx()->session();
auto sessionInMeta = session->getSession();

addQueries(sessionInMeta, dataSet);
return finish(ResultBuilder()
.value(Value(std::move(dataSet)))
.iter(Iterator::Kind::kSequential)
.finish());
}

// The queries might not sync to meta completely.
folly::Future<Status> ShowQueriesExecutor::showAllSessionQueries() {
return qctx()->getMetaClient()->listSessions()
.via(runner())
.thenValue([this](StatusOr<meta::cpp2::ListSessionsResp> resp) {
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
return Status::Error("Show sessions failed: %s.",
resp.status().toString().c_str());
}
auto sessions = resp.value().get_sessions();
DataSet dataSet({"SessionID",
"ExecutionPlanID",
"User",
"Host",
"StartTime",
"DurationInUSec",
"Status",
"Query"});
for (auto& session : sessions) {
addQueries(session, dataSet);
}
return finish(ResultBuilder()
.value(Value(std::move(dataSet)))
.iter(Iterator::Kind::kSequential)
.finish());
});
}

void ShowQueriesExecutor::addQueries(const meta::cpp2::Session& session, DataSet& dataSet) const {
auto& queries = session.get_queries();
for (auto& query : queries) {
Row row;
row.values.emplace_back(session.get_session_id());
row.values.emplace_back(query.first);
row.values.emplace_back(session.get_user_name());
row.values.emplace_back(query.second.get_graph_addr().toString());
auto dateTime =
time::TimeConversion::unixSecondsToDateTime(query.second.get_start_time() / 1000000);
dateTime.microsec = query.second.get_start_time() % 1000000;
row.values.emplace_back(std::move(dateTime));
row.values.emplace_back(query.second.get_duration());
row.values.emplace_back(apache::thrift::util::enumNameSafe(query.second.get_status()));
row.values.emplace_back(query.second.get_query());
dataSet.rows.emplace_back(std::move(row));
}
}

} // namespace graph
} // namespace nebula
Loading

0 comments on commit 669694c

Please sign in to comment.