Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Slow query. (#565)
Browse files Browse the repository at this point in the history
* Add session manager.

* use protected.

* Return killed queries when update sessions.

* Support kill query.

* Update interface.

* Support kill multi query.

* Fix typo.

* Address comment.
  • Loading branch information
CPWstatic committed Jul 1, 2021
1 parent a1b3787 commit df7a638
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 6 deletions.
24 changes: 21 additions & 3 deletions src/common/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3652,17 +3652,17 @@ MetaClient::createSession(const std::string &userName,
return future;
}

folly::Future<StatusOr<cpp2::ExecResp>>
folly::Future<StatusOr<cpp2::UpdateSessionsResp>>
MetaClient::updateSessions(const std::vector<cpp2::Session>& sessions) {
cpp2::UpdateSessionsReq req;
req.set_sessions(sessions);
folly::Promise<StatusOr<cpp2::ExecResp>> promise;
folly::Promise<StatusOr<cpp2::UpdateSessionsResp>> promise;
auto future = promise.getFuture();
getResponse(std::move(req),
[] (auto client, auto request) {
return client->future_updateSessions(request);
},
[] (cpp2::ExecResp&& resp) -> decltype(auto){
[] (cpp2::UpdateSessionsResp&& resp) -> decltype(auto){
return std::move(resp);
},
std::move(promise),
Expand Down Expand Up @@ -3718,6 +3718,24 @@ folly::Future<StatusOr<cpp2::ExecResp>> MetaClient::removeSession(SessionID sess
return future;
}

folly::Future<StatusOr<cpp2::ExecResp>> MetaClient::killQuery(
std::unordered_map<SessionID, std::unordered_set<ExecutionPlanID>> killQueries) {
cpp2::KillQueryReq req;
req.set_kill_queries(std::move(killQueries));
folly::Promise<StatusOr<cpp2::ExecResp>> promise;
auto future = promise.getFuture();
getResponse(std::move(req),
[] (auto client, auto request) {
return client->future_killQuery(request);
},
[] (cpp2::ExecResp&& resp) -> decltype(auto){
return std::move(resp);
},
std::move(promise),
true);
return future;
}

folly::Future<StatusOr<bool>> MetaClient::download(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
Expand Down
5 changes: 4 additions & 1 deletion src/common/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class MetaClient {
folly::Future<StatusOr<cpp2::CreateSessionResp>> createSession(
const std::string &userName, const HostAddr& graphAddr, const std::string &clientIp);

folly::Future<StatusOr<cpp2::ExecResp>>
folly::Future<StatusOr<cpp2::UpdateSessionsResp>>
updateSessions(const std::vector<cpp2::Session>& sessions);

folly::Future<StatusOr<cpp2::ListSessionsResp>> listSessions();
Expand All @@ -460,6 +460,9 @@ class MetaClient {

folly::Future<StatusOr<cpp2::ExecResp>> removeSession(SessionID sessionId);

folly::Future<StatusOr<cpp2::ExecResp>> killQuery(
std::unordered_map<SessionID, std::unordered_set<ExecutionPlanID>> killQueries);

// Opeartions for cache.
StatusOr<GraphSpaceID> getSpaceIdByNameFromCache(const std::string& name);

Expand Down
1 change: 1 addition & 0 deletions src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
X(E_LIST_CLUSTER_GET_ABS_PATH_FAILURE , -2071) \
X(E_GET_META_DIR_FAILURE , -2072) \
\
X(E_QUERY_NOT_FOUND , -2073) \
/* 3xxx for storaged */ \
X(E_CONSENSUS_ERROR , -3001) \
X(E_KEY_HAS_EXISTS , -3002) \
Expand Down
6 changes: 5 additions & 1 deletion src/common/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ typedef i32 (cpp.type = "nebula::Port") Port

typedef i64 (cpp.type = "nebula::SessionID") SessionID

typedef i64 (cpp.type = "nebula::ExecutionPlanID") ExecutionPlanID

// !! Struct Date has a shadow data type defined in the Date.h
// So any change here needs to be reflected to the shadow type there
struct Date {
Expand Down Expand Up @@ -324,13 +326,15 @@ enum ErrorCode {

// RESTORE Failure
E_RESTORE_FAILURE = -2068,

E_SESSION_NOT_FOUND = -2069,

// ListClusterInfo Failure
E_LIST_CLUSTER_FAILURE = -2070,
E_LIST_CLUSTER_GET_ABS_PATH_FAILURE = -2071,
E_GET_META_DIR_FAILURE = -2072,
E_GET_META_DIR_FAILURE = -2072,

E_QUERY_NOT_FOUND = -2073,

// 3xxx for storaged
E_CONSENSUS_ERROR = -3001,
Expand Down
30 changes: 29 additions & 1 deletion src/common/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,20 @@ struct ListFTIndexesResp {
3: map<binary, FTIndex> (cpp.template = "std::unordered_map") indexes,
}

enum QueryStatus {
RUNNING = 0x01,
KILLING = 0x02,
} (cpp.enum_strict)

struct QueryDesc {
1: common.Timestamp start_time;
2: QueryStatus status;
3: i64 duration;
4: binary query;
// The session might transfer between query engines, but the query do not, we must
// record which query engine the query belongs to
5: common.HostAddr graph_addr,
}

struct Session {
1: common.SessionID session_id,
Expand All @@ -1074,6 +1088,7 @@ struct Session {
7: i32 timezone,
8: binary client_ip,
9: map<binary, common.Value>(cpp.template = "std::unordered_map") configs,
10: map<common.ExecutionPlanID, QueryDesc>(cpp.template = "std::unordered_map") queries;
}

struct CreateSessionReq {
Expand All @@ -1092,6 +1107,13 @@ struct UpdateSessionsReq {
1: list<Session> sessions,
}

struct UpdateSessionsResp {
1: common.ErrorCode code,
2: common.HostAddr leader,
3: map<common.SessionID, map<common.ExecutionPlanID, QueryDesc> (cpp.template = "std::unordered_map")>
(cpp.template = "std::unordered_map") killed_queries,
}

struct ListSessionsReq {
}

Expand All @@ -1115,6 +1137,11 @@ struct RemoveSessionReq {
1: common.SessionID session_id,
}

struct KillQueryReq {
1: map<common.SessionID, set<common.ExecutionPlanID> (cpp.template = "std::unordered_set")>
(cpp.template = "std::unordered_map") kill_queries,
}

struct ReportTaskReq {
1: common.ErrorCode code,
2: i32 job_id,
Expand Down Expand Up @@ -1238,10 +1265,11 @@ service MetaService {
ListFTIndexesResp listFTIndexes(1: ListFTIndexesReq req);

CreateSessionResp createSession(1: CreateSessionReq req);
ExecResp updateSessions(1: UpdateSessionsReq req);
UpdateSessionsResp updateSessions(1: UpdateSessionsReq req);
ListSessionsResp listSessions(1: ListSessionsReq req);
GetSessionResp getSession(1: GetSessionReq req);
ExecResp removeSession(1: RemoveSessionReq req);
ExecResp killQuery(1: KillQueryReq req);

ExecResp reportTaskFinish(1: ReportTaskReq req);

Expand Down
71 changes: 71 additions & 0 deletions src/common/session/SessionManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef COMMON_SESSION_SESSIONMANAGER_H_
#define COMMON_SESSION_SESSIONMANAGER_H_

#include <folly/concurrency/ConcurrentHashMap.h>

#include "common/base/Base.h"
#include "common/thrift/ThriftTypes.h"
#include "common/base/StatusOr.h"
#include "common/thread/GenericWorker.h"
#include "common/clients/meta/MetaClient.h"


/**
* SessionManager manages the client sessions, e.g. create new, find existing and drop expired.
*/

namespace nebula {

template <class SessionType>
class SessionManager {
public:
SessionManager(meta::MetaClient* metaClient, const HostAddr &hostAddr) {
metaClient_ = metaClient;
myAddr_ = hostAddr;
scavenger_ = std::make_unique<thread::GenericWorker>();
auto ok = scavenger_->start("session-manager");
DCHECK(ok);
}

virtual ~SessionManager() {
if (scavenger_ != nullptr) {
scavenger_->stop();
scavenger_->wait();
scavenger_.reset();
}
}

/**
* Create a new session
*/
virtual folly::Future<StatusOr<std::shared_ptr<SessionType>>>
createSession(const std::string userName,
const std::string clientIp,
folly::Executor* runner) = 0;

/**
* Remove a session
*/
virtual void removeSession(SessionID id) = 0;

virtual folly::Future<StatusOr<std::shared_ptr<SessionType>>>
findSession(SessionID id, folly::Executor* runner) = 0;

protected:
using SessionPtr = std::shared_ptr<SessionType>;
folly::ConcurrentHashMap<SessionID, SessionPtr> activeSessions_;
std::unique_ptr<thread::GenericWorker> scavenger_;
meta::MetaClient *metaClient_{nullptr};
HostAddr myAddr_;
};

} // namespace nebula


#endif // COMMON_SESSION_SESSIONMANAGER_H_
1 change: 1 addition & 0 deletions src/common/thrift/ThriftTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ using ZoneID = int32_t;

using SessionID = int64_t;

using ExecutionPlanID = int64_t;
} // namespace nebula
#endif // COMMON_THRIFT_THRIFTTYPES_H_

0 comments on commit df7a638

Please sign in to comment.