Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Slow query. #1152

Merged
merged 34 commits into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
16af176
Add graph session manager.
CPWstatic Jun 15, 2021
9e40a8e
Fix usage of concurrent map.
CPWstatic Jun 16, 2021
dcda69b
Use emplace.
CPWstatic Jun 16, 2021
bd83fc2
Save the queries to session.
CPWstatic Jun 16, 2021
9b8ac81
Kill query when execute.
CPWstatic Jun 17, 2021
9d0ffbe
Update the killed query from meta.
CPWstatic Jun 17, 2021
3f76623
Add show and kill query sentence.
CPWstatic Jun 18, 2021
7b5e8e5
Add validator and node.
CPWstatic Jun 19, 2021
cfb7614
Support kill query.
CPWstatic Jun 21, 2021
73c1edc
Kill the query if session is destroyed and update addr to session if …
CPWstatic Jun 21, 2021
007550b
Fix update the killed queries from meta and delete query if execute e…
CPWstatic Jun 21, 2021
ad8a5f9
Load all unexpired sessions from meta, remove all expired sessions, r…
CPWstatic Jun 22, 2021
d56fda0
Add toString and test.
CPWstatic Jun 23, 2021
c1ee66a
Mark query killed when session sign out or expired.
CPWstatic Jun 24, 2021
036fb12
Add explain.
CPWstatic Jun 24, 2021
090e06d
Add test and fix top N.
CPWstatic Jun 24, 2021
f106ab4
Make show queries as a traverse sentence.
CPWstatic Jun 25, 2021
3fe45a6
Support kill from pipe.
CPWstatic Jun 26, 2021
6ea4c99
Check expr type when kill query.
CPWstatic Jun 27, 2021
d70112f
Add tck test.
CPWstatic Jun 27, 2021
3f2a067
Test slow query seperately.
CPWstatic Jun 28, 2021
a53af68
Fix group by.
CPWstatic Jun 28, 2021
a7f7890
Fix kill current session query.
CPWstatic Jun 29, 2021
1e45673
Remove top n.
CPWstatic Jun 29, 2021
aa1d6ef
Rebase and fix.
CPWstatic Jun 29, 2021
a89d6d3
Check sessions by listing sessions.
CPWstatic Jun 30, 2021
6f6f3db
Address comment.
CPWstatic Jun 30, 2021
c86e640
Address comment.
CPWstatic Jul 1, 2021
d96001f
Rebase and fix.
CPWstatic Jul 1, 2021
e3eb9b2
Add more test.
CPWstatic Jul 1, 2021
04671b5
Address comment.
CPWstatic Jul 1, 2021
ed147ed
Format.
CPWstatic Jul 1, 2021
b698aa0
Charge session later.
CPWstatic Jul 1, 2021
240ca40
Fix test.
CPWstatic Jul 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/context/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ class QueryContext {
rctx_->resp().errorCode = ErrorCode::E_PARTIAL_SUCCEEDED;
}

void markKilled() {
killed_.exchange(true);
}

bool isKilled() const {
return killed_.load();
}

private:
void init();

Expand All @@ -147,6 +155,8 @@ class QueryContext {
std::unique_ptr<ObjectPool> objPool_;
std::unique_ptr<IdGenerator> idGen_;
std::unique_ptr<SymbolTable> symTable_;

std::atomic<bool> killed_{false};
};

} // namespace graph
Expand Down
2 changes: 1 addition & 1 deletion src/context/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ SET(CONTEXT_TEST_LIBS
$<TARGET_OBJECTS:validator_obj>
$<TARGET_OBJECTS:graph_flags_obj>
$<TARGET_OBJECTS:graph_auth_obj>
$<TARGET_OBJECTS:session_obj>
$<TARGET_OBJECTS:graph_session_obj>
$<TARGET_OBJECTS:planner_obj>
$<TARGET_OBJECTS:idgenerator_obj>
)
Expand Down
2 changes: 1 addition & 1 deletion src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:common_version_obj>
$<TARGET_OBJECTS:util_obj>
$<TARGET_OBJECTS:service_obj>
$<TARGET_OBJECTS:session_obj>
$<TARGET_OBJECTS:graph_session_obj>
$<TARGET_OBJECTS:query_engine_obj>
$<TARGET_OBJECTS:parser_obj>
$<TARGET_OBJECTS:validator_obj>
Expand Down
2 changes: 2 additions & 0 deletions src/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ nebula_add_library(
admin/SignInTSServiceExecutor.cpp
admin/SignOutTSServiceExecutor.cpp
admin/SessionExecutor.cpp
admin/ShowQueriesExecutor.cpp
admin/KillQueryExecutor.cpp
maintain/TagExecutor.cpp
maintain/TagIndexExecutor.cpp
maintain/EdgeExecutor.cpp
Expand Down
16 changes: 15 additions & 1 deletion src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
#include "executor/admin/SwitchSpaceExecutor.h"
#include "executor/admin/UpdateUserExecutor.h"
#include "executor/admin/ZoneExecutor.h"
#include "executor/admin/ShowQueriesExecutor.h"
#include "executor/admin/KillQueryExecutor.h"
#include "executor/algo/BFSShortestPathExecutor.h"
#include "executor/algo/CartesianProductExecutor.h"
#include "executor/algo/ConjunctPathExecutor.h"
Expand Down Expand Up @@ -497,6 +499,12 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kUpdateSession: {
return pool->add(new UpdateSessionExecutor(node, qctx));
}
case PlanNode::Kind::kShowQueries: {
return pool->add(new ShowQueriesExecutor(node, qctx));
}
case PlanNode::Kind::kKillQuery: {
return pool->add(new KillQueryExecutor(node, qctx));
}
case PlanNode::Kind::kUnknown: {
LOG(FATAL) << "Unknown plan node kind " << static_cast<int32_t>(node->kind());
break;
Expand All @@ -521,6 +529,12 @@ Executor::Executor(const std::string &name, const PlanNode *node, QueryContext *
Executor::~Executor() {}

Status Executor::open() {
if (qctx_->isKilled()) {
VLOG(1) << "Execution is being killed. session: " << qctx()->rctx()->session()->id()
<< "ep: " << qctx()->plan()->id()
<< "query: " << qctx()->rctx()->query();
return Status::Error("Execution had been killed");
}
auto status = MemInfo::make();
NG_RETURN_IF_ERROR(status);
auto mem = std::move(status).value();
Expand Down Expand Up @@ -555,7 +569,7 @@ folly::Future<Status> Executor::start(Status status) const {
}

folly::Future<Status> Executor::error(Status status) const {
return folly::makeFuture<Status>(ExecutionError(std::move(status))).via(runner());
return folly::makeFuture<Status>(std::move(status)).via(runner());
}

void Executor::drop() {
Expand Down
145 changes: 145 additions & 0 deletions src/executor/admin/KillQueryExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "executor/admin/KillQueryExecutor.h"

#include "context/QueryContext.h"
#include "planner/plan/Admin.h"

namespace nebula {
namespace graph {
folly::Future<Status> KillQueryExecutor::execute() {
SCOPED_TIMER(&execTime_);

// TODO: permision check

QueriesMap toBeVerifiedQueries;
QueriesMap killQueries;
NG_RETURN_IF_ERROR(verifyTheQueriesByLocalCache(toBeVerifiedQueries, killQueries));

return qctx()
->getMetaClient()
->listSessions()
.via(runner())
.thenValue([toBeVerifiedQueries = std::move(toBeVerifiedQueries),
killQueries = std::move(killQueries),
this](StatusOr<meta::cpp2::ListSessionsResp> listResp) mutable {
std::vector<meta::cpp2::Session> sessionsInMeta;
if (listResp.ok()) {
sessionsInMeta = std::move(listResp.value()).get_sessions();
} else {
LOG(ERROR) << listResp.status();
}

auto status = verifyTheQueriesByMetaInfo(toBeVerifiedQueries, sessionsInMeta);
if (!status.ok()) {
return folly::makeFuture<StatusOr<meta::cpp2::ExecResp>>(status);
}

killCurrentHostQueries(killQueries);

// upload all queries to be killed to meta.
return qctx()->getMetaClient()->killQuery(std::move(killQueries));
})
.thenValue([](auto&& resp) {
if (!resp.ok()) {
return resp.status();
}
return Status::OK();
});
}

Status KillQueryExecutor::verifyTheQueriesByLocalCache(QueriesMap& toBeVerifiedQueries,
QueriesMap& killQueries) {
auto* killQuery = asNode<KillQuery>(node());
auto inputVar = killQuery->inputVar();
auto iter = ectx_->getResult(inputVar).iter();
DCHECK(!!iter);
QueryExpressionContext ctx(ectx_);
auto sessionExpr = killQuery->sessionId();
auto epExpr = killQuery->epId();

auto* session = qctx()->rctx()->session();
auto* sessionMgr = qctx_->rctx()->sessionMgr();
for (; iter->valid(); iter->next()) {
auto& sessionVal = sessionExpr->eval(ctx(iter.get()));
if (!sessionVal.isInt()) {
std::stringstream ss;
ss << "Session `" << sessionExpr->toString() << "' is not kind of"
<< " int, but was " << sessionVal.type();
return Status::Error(ss.str());
}
auto& epVal = epExpr->eval(ctx(iter.get()));
if (!epVal.isInt()) {
std::stringstream ss;
ss << "ExecutionPlanID `" << epExpr->toString() << "' is not kind of"
<< " int, but was " << epVal.type();
return Status::Error(ss.str());
}

auto sessionId = sessionVal.getInt();
auto epId = epVal.getInt();
if (sessionId == session->id() || sessionId < 0) {
CPWstatic marked this conversation as resolved.
Show resolved Hide resolved
if (!session->findQuery(epId)) {
return Status::Error("ExecutionPlanId[%ld] does not exist in current Session.",
epId);
}
killQueries[session->id()].emplace(epId);
} else {
auto sessionPtr = sessionMgr->findSessionFromCache(sessionId);
if (sessionPtr == nullptr) {
toBeVerifiedQueries[sessionId].emplace(epId);
} else if (!sessionPtr->findQuery(epId)) {
return Status::Error("ExecutionPlanId[%ld] does not exist in Session[%ld].",
epId, sessionId);
}
killQueries[sessionId].emplace(epId);
CPWstatic marked this conversation as resolved.
Show resolved Hide resolved
}
}
return Status::OK();
}

void KillQueryExecutor::killCurrentHostQueries(const QueriesMap& killQueries) {
auto* session = qctx()->rctx()->session();
auto* sessionMgr = qctx_->rctx()->sessionMgr();
for (auto& s : killQueries) {
auto sessionId = s.first;
for (auto& epId : s.second) {
if (sessionId == session->id()) {
session->markQueryKilled(epId);
} else {
auto sessionPtr = sessionMgr->findSessionFromCache(sessionId);
if (sessionPtr != nullptr) {
sessionPtr->markQueryKilled(epId);
}
}
}
}
}

Status KillQueryExecutor::verifyTheQueriesByMetaInfo(
const QueriesMap& toBeVerifiedQueries,
const std::vector<meta::cpp2::Session>& sessionsInMeta) {
for (auto& s : toBeVerifiedQueries) {
auto sessionId = s.first;
auto found =
std::find_if(sessionsInMeta.begin(), sessionsInMeta.end(), [sessionId](auto& val) {
return val.get_session_id() == sessionId;
});
if (found == sessionsInMeta.end()) {
return Status::Error("SessionId[%ld] does not exist", sessionId);
}
for (auto& epId : s.second) {
if (found->get_queries().find(epId) == found->get_queries().end()) {
return Status::Error(
"ExecutionPlanId[%ld] does not exist in Session[%ld].", epId, sessionId);
}
}
}
return Status::OK();
}
} // namespace graph
} // namespace nebula
34 changes: 34 additions & 0 deletions src/executor/admin/KillQueryExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef EXECUTOR_ADMIN_KILLQUERYEXECUTOR_H_
#define EXECUTOR_ADMIN_KILLQUERYEXECUTOR_H_

#include "executor/Executor.h"

namespace nebula {
namespace graph {
class KillQueryExecutor final : public Executor {
public:
KillQueryExecutor(const PlanNode *node, QueryContext *qctx)
: Executor("KillQueryExecutor", node, qctx) {}

folly::Future<Status> execute() override;

private:
using QueriesMap = std::unordered_map<SessionID, std::unordered_set<ExecutionPlanID>>;
Status verifyTheQueriesByLocalCache(QueriesMap& toBeVerifiedQueries,
QueriesMap& killQueries);

Status verifyTheQueriesByMetaInfo(const QueriesMap& toBeVerifiedQueries,
const std::vector<meta::cpp2::Session>& sessionsInMeta);

void killCurrentHostQueries(const QueriesMap& killQueries);
};
} // namespace graph
} // namespace nebula

#endif // EXECUTOR_ADMIN_KILLQUERYEXECUTOR_H_
99 changes: 99 additions & 0 deletions src/executor/admin/ShowQueriesExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include <thrift/lib/cpp/util/EnumUtils.h>

#include "executor/admin/ShowQueriesExecutor.h"

#include "common/time/TimeUtils.h"
#include "context/QueryContext.h"
#include "planner/plan/Admin.h"
#include "util/ScopedTimer.h"

namespace nebula {
namespace graph {
folly::Future<Status> ShowQueriesExecutor::execute() {
SCOPED_TIMER(&execTime_);

auto* showQueries = asNode<ShowQueries>(node());
auto isAll = showQueries->isAll();

if (!isAll) {
return showCurrentSessionQueries();
} else {
return showAllSessionQueries();
}
}

folly::Future<Status> ShowQueriesExecutor::showCurrentSessionQueries() {
DataSet dataSet({"SessionID",
"ExecutionPlanID",
"User",
"Host",
"StartTime",
"DurationInUSec",
"Status",
"Query"});
auto* session = qctx()->rctx()->session();
auto sessionInMeta = session->getSession();

addQueries(sessionInMeta, dataSet);
return finish(ResultBuilder()
.value(Value(std::move(dataSet)))
.iter(Iterator::Kind::kSequential)
.finish());
}

// The queries might not sync to meta completely.
folly::Future<Status> ShowQueriesExecutor::showAllSessionQueries() {
return qctx()->getMetaClient()->listSessions()
.via(runner())
.thenValue([this](StatusOr<meta::cpp2::ListSessionsResp> resp) {
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
return Status::Error("Show sessions failed: %s.",
resp.status().toString().c_str());
}
auto sessions = resp.value().get_sessions();
DataSet dataSet({"SessionID",
"ExecutionPlanID",
"User",
"Host",
"StartTime",
"DurationInUSec",
"Status",
"Query"});
for (auto& session : sessions) {
addQueries(session, dataSet);
}
return finish(ResultBuilder()
.value(Value(std::move(dataSet)))
.iter(Iterator::Kind::kSequential)
.finish());
});
}

void ShowQueriesExecutor::addQueries(const meta::cpp2::Session& session, DataSet& dataSet) const {
auto& queries = session.get_queries();
for (auto& query : queries) {
Row row;
row.values.emplace_back(session.get_session_id());
row.values.emplace_back(query.first);
row.values.emplace_back(session.get_user_name());
row.values.emplace_back(query.second.get_graph_addr().toString());
auto dateTime =
time::TimeConversion::unixSecondsToDateTime(query.second.get_start_time() / 1000000);
dateTime.microsec = query.second.get_start_time() % 1000000;
row.values.emplace_back(std::move(dateTime));
row.values.emplace_back(query.second.get_duration());
row.values.emplace_back(apache::thrift::util::enumNameSafe(query.second.get_status()));
row.values.emplace_back(query.second.get_query());
dataSet.rows.emplace_back(std::move(row));
}
}

} // namespace graph
} // namespace nebula
Loading