Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Slow query. #565

Merged
merged 9 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions src/common/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3652,17 +3652,17 @@ MetaClient::createSession(const std::string &userName,
return future;
}

folly::Future<StatusOr<cpp2::ExecResp>>
folly::Future<StatusOr<cpp2::UpdateSessionsResp>>
MetaClient::updateSessions(const std::vector<cpp2::Session>& sessions) {
cpp2::UpdateSessionsReq req;
req.set_sessions(sessions);
folly::Promise<StatusOr<cpp2::ExecResp>> promise;
folly::Promise<StatusOr<cpp2::UpdateSessionsResp>> promise;
auto future = promise.getFuture();
getResponse(std::move(req),
[] (auto client, auto request) {
return client->future_updateSessions(request);
},
[] (cpp2::ExecResp&& resp) -> decltype(auto){
[] (cpp2::UpdateSessionsResp&& resp) -> decltype(auto){
return std::move(resp);
},
std::move(promise),
Expand Down Expand Up @@ -3718,6 +3718,24 @@ folly::Future<StatusOr<cpp2::ExecResp>> MetaClient::removeSession(SessionID sess
return future;
}

folly::Future<StatusOr<cpp2::ExecResp>> MetaClient::killQuery(
std::unordered_map<SessionID, std::unordered_set<ExecutionPlanID>> killQueries) {
cpp2::KillQueryReq req;
req.set_kill_queries(std::move(killQueries));
folly::Promise<StatusOr<cpp2::ExecResp>> promise;
auto future = promise.getFuture();
getResponse(std::move(req),
[] (auto client, auto request) {
return client->future_killQuery(request);
},
[] (cpp2::ExecResp&& resp) -> decltype(auto){
return std::move(resp);
},
std::move(promise),
true);
return future;
}

folly::Future<StatusOr<bool>> MetaClient::download(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
Expand Down
5 changes: 4 additions & 1 deletion src/common/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class MetaClient {
folly::Future<StatusOr<cpp2::CreateSessionResp>> createSession(
const std::string &userName, const HostAddr& graphAddr, const std::string &clientIp);

folly::Future<StatusOr<cpp2::ExecResp>>
folly::Future<StatusOr<cpp2::UpdateSessionsResp>>
updateSessions(const std::vector<cpp2::Session>& sessions);

folly::Future<StatusOr<cpp2::ListSessionsResp>> listSessions();
Expand All @@ -460,6 +460,9 @@ class MetaClient {

folly::Future<StatusOr<cpp2::ExecResp>> removeSession(SessionID sessionId);

folly::Future<StatusOr<cpp2::ExecResp>> killQuery(
std::unordered_map<SessionID, std::unordered_set<ExecutionPlanID>> killQueries);

// Opeartions for cache.
StatusOr<GraphSpaceID> getSpaceIdByNameFromCache(const std::string& name);

Expand Down
1 change: 1 addition & 0 deletions src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
X(E_LIST_CLUSTER_GET_ABS_PATH_FAILURE , -2071) \
X(E_GET_META_DIR_FAILURE , -2072) \
\
X(E_QUERY_NOT_FOUND , -2073) \
/* 3xxx for storaged */ \
X(E_CONSENSUS_ERROR , -3001) \
X(E_KEY_HAS_EXISTS , -3002) \
Expand Down
6 changes: 5 additions & 1 deletion src/common/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ typedef i32 (cpp.type = "nebula::Port") Port

typedef i64 (cpp.type = "nebula::SessionID") SessionID

typedef i64 (cpp.type = "nebula::ExecutionPlanID") ExecutionPlanID

// !! Struct Date has a shadow data type defined in the Date.h
// So any change here needs to be reflected to the shadow type there
struct Date {
Expand Down Expand Up @@ -324,13 +326,15 @@ enum ErrorCode {

// RESTORE Failure
E_RESTORE_FAILURE = -2068,

E_SESSION_NOT_FOUND = -2069,

// ListClusterInfo Failure
E_LIST_CLUSTER_FAILURE = -2070,
E_LIST_CLUSTER_GET_ABS_PATH_FAILURE = -2071,
E_GET_META_DIR_FAILURE = -2072,
E_GET_META_DIR_FAILURE = -2072,

E_QUERY_NOT_FOUND = -2073,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add to nebula-common/src/common/graph/Response.h


// 3xxx for storaged
E_CONSENSUS_ERROR = -3001,
Expand Down
30 changes: 29 additions & 1 deletion src/common/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,20 @@ struct ListFTIndexesResp {
3: map<binary, FTIndex> (cpp.template = "std::unordered_map") indexes,
}

enum QueryStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer state than status

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no difference between these two words? Besides we have a lot struct defined as Status now.

RUNNING = 0x01,
KILLING = 0x02,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not need a finished or stopped state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The finished queries will be removed.

} (cpp.enum_strict)

struct QueryDesc {
1: common.Timestamp start_time;
2: QueryStatus status;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

3: i64 duration;
4: binary query;
// The session might transfer between query engines, but the query do not, we must
// record which query engine the query belongs to
5: common.HostAddr graph_addr,
}

struct Session {
1: common.SessionID session_id,
Expand All @@ -1074,6 +1088,7 @@ struct Session {
7: i32 timezone,
8: binary client_ip,
9: map<binary, common.Value>(cpp.template = "std::unordered_map") configs,
10: map<common.ExecutionPlanID, QueryDesc>(cpp.template = "std::unordered_map") queries;
}

struct CreateSessionReq {
Expand All @@ -1092,6 +1107,13 @@ struct UpdateSessionsReq {
1: list<Session> sessions,
}

struct UpdateSessionsResp {
1: common.ErrorCode code,
2: common.HostAddr leader,
3: map<common.SessionID, map<common.ExecutionPlanID, QueryDesc> (cpp.template = "std::unordered_map")>
(cpp.template = "std::unordered_map") killed_queries,
}

struct ListSessionsReq {
}

Expand All @@ -1115,6 +1137,11 @@ struct RemoveSessionReq {
1: common.SessionID session_id,
}

struct KillQueryReq {
1: map<common.SessionID, set<common.ExecutionPlanID> (cpp.template = "std::unordered_set")>
(cpp.template = "std::unordered_map") kill_queries,
}

struct ReportTaskReq {
1: common.ErrorCode code,
2: i32 job_id,
Expand Down Expand Up @@ -1238,10 +1265,11 @@ service MetaService {
ListFTIndexesResp listFTIndexes(1: ListFTIndexesReq req);

CreateSessionResp createSession(1: CreateSessionReq req);
ExecResp updateSessions(1: UpdateSessionsReq req);
UpdateSessionsResp updateSessions(1: UpdateSessionsReq req);
ListSessionsResp listSessions(1: ListSessionsReq req);
GetSessionResp getSession(1: GetSessionReq req);
ExecResp removeSession(1: RemoveSessionReq req);
ExecResp killQuery(1: KillQueryReq req);

ExecResp reportTaskFinish(1: ReportTaskReq req);

Expand Down
71 changes: 71 additions & 0 deletions src/common/session/SessionManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef COMMON_SESSION_SESSIONMANAGER_H_
#define COMMON_SESSION_SESSIONMANAGER_H_

#include <folly/concurrency/ConcurrentHashMap.h>

#include "common/base/Base.h"
#include "common/thrift/ThriftTypes.h"
#include "common/base/StatusOr.h"
#include "common/thread/GenericWorker.h"
#include "common/clients/meta/MetaClient.h"


/**
* SessionManager manages the client sessions, e.g. create new, find existing and drop expired.
*/

namespace nebula {

template <class SessionType>
class SessionManager {
public:
SessionManager(meta::MetaClient* metaClient, const HostAddr &hostAddr) {
metaClient_ = metaClient;
myAddr_ = hostAddr;
scavenger_ = std::make_unique<thread::GenericWorker>();
auto ok = scavenger_->start("session-manager");
DCHECK(ok);
}

virtual ~SessionManager() {
if (scavenger_ != nullptr) {
scavenger_->stop();
scavenger_->wait();
scavenger_.reset();
}
}

/**
* Create a new session
*/
virtual folly::Future<StatusOr<std::shared_ptr<SessionType>>>
createSession(const std::string userName,
const std::string clientIp,
folly::Executor* runner) = 0;

/**
* Remove a session
*/
virtual void removeSession(SessionID id) = 0;

virtual folly::Future<StatusOr<std::shared_ptr<SessionType>>>
findSession(SessionID id, folly::Executor* runner) = 0;

protected:
using SessionPtr = std::shared_ptr<SessionType>;
folly::ConcurrentHashMap<SessionID, SessionPtr> activeSessions_;
std::unique_ptr<thread::GenericWorker> scavenger_;
meta::MetaClient *metaClient_{nullptr};
HostAddr myAddr_;
};

} // namespace nebula


#endif // COMMON_SESSION_SESSIONMANAGER_H_
1 change: 1 addition & 0 deletions src/common/thrift/ThriftTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ using ZoneID = int32_t;

using SessionID = int64_t;

using ExecutionPlanID = int64_t;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query id is better than plan id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For compatibility considered, I want to keep the idea Query ID for now, since we might have a truly global unique query id in further future.

} // namespace nebula
#endif // COMMON_THRIFT_THRIFTTYPES_H_