Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix update sessions when leader change happens #5225

Merged
merged 4 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/graph/session/GraphSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "graph/session/GraphSessionManager.h"

#include "common/base/Base.h"
#include "common/base/Status.h"
#include "common/stats/StatsManager.h"
#include "common/time/WallClock.h"
#include "graph/service/GraphFlags.h"
Expand Down Expand Up @@ -258,8 +259,9 @@ void GraphSessionManager::updateSessionsToMeta() {
auto handleKilledQueries = [this](auto&& resp) {
if (!resp.ok()) {
LOG(ERROR) << "Update sessions failed: " << resp.status();
return Status::Error("Update sessions failed: %s", resp.status().toString().c_str());
return;
}

auto& killedQueriesForEachSession = *resp.value().killed_queries_ref();
for (auto& killedQueries : killedQueriesForEachSession) {
auto sessionId = killedQueries.first;
Expand All @@ -276,19 +278,24 @@ void GraphSessionManager::updateSessionsToMeta() {
VLOG(1) << "Kill query, session: " << sessionId << " plan: " << epId;
}
}
return Status::OK();
};

// The response from meta contains sessions that are marked as killed, so we need to clean the
// local cache and update statistics
auto handleKilledSessions = [this](auto&& resp) {
if (!resp.ok()) {
LOG(ERROR) << "Update sessions failed: " << resp.status();
return;
}

auto killSessions = resp.value().get_killed_sessions();
removeSessionFromLocalCache(killSessions);
};

auto result = metaClient_->updateSessions(sessions).get();
if (!result.ok()) {
LOG(ERROR) << "Update sessions failed: " << result;
LOG(ERROR) << "Update sessions failed: " << result.status();
return;
}
handleKilledQueries(result);
handleKilledSessions(result);
Expand Down
20 changes: 16 additions & 4 deletions src/meta/processors/session/SessionManagerProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) {
if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
LOG(INFO) << "Session id '" << sessionId << "' not found";
// If the session requested to be updated can not be found in meta, the session has been
// killed
// If the session requested to be updated can not be found in meta, we consider the session
// has been killed
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
killedSessions.emplace_back(sessionId);
continue;
} else {
handleErrorCode(errCode);
onFinished();
return;
}
}

Expand Down Expand Up @@ -169,10 +173,18 @@ void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) {
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
auto ret = doGet(sessionKey);

// If the session is not found, we should continue to remove other sessions.
if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
LOG(INFO) << "Session id `" << sessionId << "' not found";
continue;

// If the session is not found, we should continue to remove other sessions.
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
continue;
} else { // for other error like leader change, we handle the error and return.
handleErrorCode(errCode);
onFinished();
return;
}
}

// Remove session key from kvstore
Expand Down