Skip to content

Commit

Permalink
split stop to stop and join
Browse files Browse the repository at this point in the history
address comments

regular update

split stop to stop and join

remove dangle edge

split mem lock by term
  • Loading branch information
liuyu85cn committed Jan 18, 2022
1 parent 0a5df3f commit 5b963f3
Show file tree
Hide file tree
Showing 45 changed files with 790 additions and 672 deletions.
1 change: 0 additions & 1 deletion src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ void MetaClient::heartBeatThreadFunc() {
LOG(ERROR) << "Heartbeat failed, status:" << ret.status();
return;
}

// if MetaServer has some changes, refresh the localCache_
loadData();
loadCfg();
Expand Down
6 changes: 3 additions & 3 deletions src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ ::nebula::cpp2::ErrorCode getErrorCode(T& tryResp) {
switch (stResp.status().code()) {
case Status::Code::kLeaderChanged:
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
case Status::Code::kError:
return nebula::cpp2::ErrorCode::E_RPC_FAILURE;
default:
LOG(ERROR) << "not impl error transform: code="
<< static_cast<int32_t>(stResp.status().code());
Expand Down Expand Up @@ -69,8 +71,8 @@ void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedReq

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
VLOG(1) << "chainUpdateEdge rpc: " << apache::thrift::util::enumNameSafe(code);
if (code == ::nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainUpdateEdge(reversedRequest, termOfSrc, optVersion, std::move(p));
} else {
p.setValue(code);
Expand Down Expand Up @@ -108,7 +110,6 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainAddEdges(directReq, termId, optVersion, std::move(p));
} else {
p.setValue(code);
Expand Down Expand Up @@ -165,7 +166,6 @@ void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainDeleteEdges(req, txnId, termId, std::move(p));
} else {
p.setValue(code);
Expand Down
14 changes: 4 additions & 10 deletions src/common/utils/MemoryLockWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class MemoryLockGuard {
}

~MemoryLockGuard() {
if (locked_) {
if (locked_ && autoUnlock_) {
lock_->unlockBatch(keys_);
}
}
Expand All @@ -71,22 +71,16 @@ class MemoryLockGuard {
return *iter_;
}

// this will manual set the lock to unlocked state
// which mean will not release all locks automatically
// please make sure you really know the side effect
void forceLock() {
locked_ = true;
}

void forceUnlock() {
locked_ = false;
void setAutoUnlock(bool autoUnlock) {
autoUnlock_ = autoUnlock;
}

protected:
MemoryLockCore<Key>* lock_;
std::vector<Key> keys_;
typename std::vector<Key>::iterator iter_;
bool locked_{false};
bool autoUnlock_{true};
};

} // namespace nebula
1 change: 1 addition & 0 deletions src/mock/MockCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ void MockCluster::initStorageKV(const char* dataPath,

txnMan_ = std::make_unique<storage::TransactionManager>(storageEnv_.get());
storageEnv_->txnMan_ = txnMan_.get();
txnMan_->start();
}

void MockCluster::startStorage(HostAddr addr,
Expand Down
9 changes: 4 additions & 5 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,13 @@ nebula_add_library(
transaction/ConsistUtil.cpp
transaction/ChainUpdateEdgeLocalProcessor.cpp
transaction/ChainUpdateEdgeRemoteProcessor.cpp
transaction/ChainResumeProcessor.cpp
transaction/ChainAddEdgesGroupProcessor.cpp
transaction/ChainAddEdgesLocalProcessor.cpp
transaction/ChainAddEdgesRemoteProcessor.cpp
transaction/ResumeAddEdgeProcessor.cpp
transaction/ResumeAddEdgeRemoteProcessor.cpp
transaction/ResumeUpdateProcessor.cpp
transaction/ResumeUpdateRemoteProcessor.cpp
transaction/ChainResumeAddPrimeProcessor.cpp
transaction/ChainResumeAddDoublePrimeProcessor.cpp
transaction/ChainResumeUpdatePrimeProcessor.cpp
transaction/ChainResumeUpdateDoublePrimeProcessor.cpp
transaction/ChainProcessorFactory.cpp
transaction/ChainDeleteEdgesGroupProcessor.cpp
transaction/ChainDeleteEdgesLocalProcessor.cpp
Expand Down
7 changes: 4 additions & 3 deletions src/storage/InternalStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ class InternalStorageServiceHandler final : public cpp2::InternalStorageServiceS
public:
explicit InternalStorageServiceHandler(StorageEnv* env);

folly::Future<cpp2::ExecResponse> future_chainAddEdges(const cpp2::ChainAddEdgesRequest& p_req);
folly::Future<cpp2::ExecResponse> future_chainAddEdges(
const cpp2::ChainAddEdgesRequest& p_req) override;

folly::Future<cpp2::UpdateResponse> future_chainUpdateEdge(
const cpp2::ChainUpdateEdgeRequest& p_req);
const cpp2::ChainUpdateEdgeRequest& p_req) override;

folly::Future<cpp2::ExecResponse> future_chainDeleteEdges(
const cpp2::ChainDeleteEdgesRequest& p_req);
const cpp2::ChainDeleteEdgesRequest& p_req) override;

private:
StorageEnv* env_{nullptr};
Expand Down
6 changes: 5 additions & 1 deletion src/storage/StorageServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ int32_t StorageServer::getAdminStoreSeqId() {

bool StorageServer::start() {
ioThreadPool_ = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);
ioThreadPoolForMeta_ = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);
#ifndef BUILD_STANDALONE
const int32_t numWorkerThreads = FLAGS_num_worker_threads;
#else
Expand All @@ -183,7 +184,7 @@ bool StorageServer::start() {
options.rootPath_ = boost::filesystem::current_path().string();
options.dataPaths_ = dataPaths_;

metaClient_ = std::make_unique<meta::MetaClient>(ioThreadPool_, metaAddrs_, options);
metaClient_ = std::make_unique<meta::MetaClient>(ioThreadPoolForMeta_, metaAddrs_, options);
if (!metaClient_->waitForMetadReady()) {
LOG(ERROR) << "waitForMetadReady error!";
return false;
Expand Down Expand Up @@ -222,6 +223,8 @@ bool StorageServer::start() {
env_->interClient_ = interClient_.get();

txnMan_ = std::make_unique<TransactionManager>(env_.get());
txnMan_->monitorPoolStat(ioThreadPool_.get(), "ioThreadPool_");
txnMan_->monitorPoolStat(ioThreadPoolForMeta_.get(), "ioThreadPoolForMeta_");
if (!txnMan_->start()) {
LOG(ERROR) << "Start transaction manager failed!";
return false;
Expand Down Expand Up @@ -397,6 +400,7 @@ void StorageServer::stop() {

if (txnMan_) {
txnMan_->stop();
txnMan_->join();
}
if (taskMgr_) {
taskMgr_->shutdown();
Expand Down
1 change: 1 addition & 0 deletions src/storage/StorageServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class StorageServer final {
bool initWebService();

std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPoolForMeta_;
std::shared_ptr<apache::thrift::concurrency::ThreadManager> workers_;

std::unique_ptr<std::thread> storageThread_;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/index/LookupProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class LookupProcessor : public BaseProcessor<cpp2::LookupIndexResp> {
folly::Executor* executor_{nullptr};
std::unique_ptr<PlanContext> planContext_;
std::unique_ptr<RuntimeContext> context_;
/**
* @brief the final output
*/
nebula::DataSet resultDataSet_;
nebula::DataSet statsDataSet_;
std::vector<nebula::DataSet> partResults_;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/kv/GetProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ namespace storage {

extern ProcessorCounters kGetCounters;

/**
* @brief this is a simple get() interface when storage run in KV mode.
*/
class GetProcessor : public BaseProcessor<cpp2::KVGetResponse> {
public:
static GetProcessor* instance(StorageEnv* env,
Expand Down
4 changes: 3 additions & 1 deletion src/storage/kv/PutProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ namespace nebula {
namespace storage {

extern ProcessorCounters kPutCounters;

/**
* @brief this is a simple put() interface when storage run in KV mode.
*/
class PutProcessor : public BaseProcessor<cpp2::ExecResponse> {
public:
static PutProcessor* instance(StorageEnv* env,
Expand Down
3 changes: 3 additions & 0 deletions src/storage/kv/RemoveProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ namespace storage {

extern ProcessorCounters kRemoveCounters;

/**
* @brief this is a simple remove() interface when storage run in KV mode.
*/
class RemoveProcessor : public BaseProcessor<cpp2::ExecResponse> {
public:
static RemoveProcessor* instance(StorageEnv* env,
Expand Down
15 changes: 15 additions & 0 deletions src/storage/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,21 @@ nebula_add_test(
gtest
)

nebula_add_test(
NAME
chain_delete_edge_test
SOURCES
ChainDeleteEdgesTest.cpp
OBJECTS
${storage_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
${PROXYGEN_LIBRARIES}
wangle
gtest
)

nebula_add_test(
NAME
index_test
Expand Down
2 changes: 2 additions & 0 deletions src/storage/test/ChainAddEdgesTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ TEST(ChainAddEdgesTest, processRemoteTest) {
} // namespace nebula

int main(int argc, char** argv) {
FLAGS_trace_toss = true;
FLAGS_v = 1;
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, false);
google::SetStderrLogging(google::INFO);
Expand Down
73 changes: 40 additions & 33 deletions src/storage/test/ChainDeleteEdgesTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ TEST(ChainDeleteEdgesTest, DISABLED_Test5) {
delProc->rcProcessRemote = nebula::cpp2::ErrorCode::SUCCEEDED;
delProc->rcProcessLocal = nebula::cpp2::ErrorCode::SUCCEEDED;

UPCLT iClient(FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED));
FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get());

LOG(INFO) << "Run DeleteEdgesReq...";
auto futDel = delProc->getFuture();
delProc->process(delReq);
Expand All @@ -231,16 +234,13 @@ TEST(ChainDeleteEdgesTest, DISABLED_Test5) {
LOG(INFO) << "after del(), edge num = " << num;
EXPECT_EQ(num, 167);

env->txnMan_->scanAll();
auto* iClient = FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED);
FakeInternalStorageClient::hookInternalStorageClient(env, iClient);
ChainResumeProcessor resumeProc(env);
resumeProc.process();
// std::this_thread::sleep_for(std::chrono::milliseconds());
for (PartitionID i = 1; i <= partNum; ++i) {
env->txnMan_->scanPrimes(mockSpaceId, i);
}
env->txnMan_->stop();
env->txnMan_->join();
num = util.checkNumOfKey(env, mockSpaceId, edgeKeys);
EXPECT_EQ(num, 0);

delete iClient;
}

// add some edges, then delete all of them, not execute local commit
Expand Down Expand Up @@ -277,6 +277,9 @@ TEST(ChainDeleteEdgesTest, Test6) {
delProc->rcProcessRemote = nebula::cpp2::ErrorCode::SUCCEEDED;
delProc->rcProcessLocal = nebula::cpp2::ErrorCode::SUCCEEDED;

UPCLT iClient(FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED));
FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get());

LOG(INFO) << "Run DeleteEdgesReq...";
auto futDel = delProc->getFuture();
delProc->process(delReq);
Expand All @@ -286,16 +289,18 @@ TEST(ChainDeleteEdgesTest, Test6) {
LOG(INFO) << "after del(), edge num = " << num;
EXPECT_EQ(num, 167);

env->txnMan_->scanAll();
auto* iClient = FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED);
FakeInternalStorageClient::hookInternalStorageClient(env, iClient);
ChainResumeProcessor resumeProc(env);
resumeProc.process();
for (PartitionID i = 1; i <= partNum; ++i) {
env->txnMan_->scanPrimes(mockSpaceId, i);
}
// ChainResumeProcessor resumeProc(env);
// resumeProc.process();
std::this_thread::sleep_for(std::chrono::seconds(2));
sleep(1);
env->txnMan_->stop();
env->txnMan_->join();

num = util.checkNumOfKey(env, mockSpaceId, edgeKeys);
EXPECT_EQ(num, 0);
std::this_thread::sleep_for(std::chrono::milliseconds(300));

delete iClient;
}

// add some edges, delete one of them, rpc failure
Expand Down Expand Up @@ -332,6 +337,9 @@ TEST(ChainDeleteEdgesTest, Test7) {
auto delReq = delProc->makeDelRequest(addReq, limit);
delProc->rcProcessRemote = nebula::cpp2::ErrorCode::E_RPC_FAILURE;

UPCLT iClient(FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED));
FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get());

LOG(INFO) << "Run DeleteEdgesReq...";
auto futDel = delProc->getFuture();
delProc->process(delReq);
Expand All @@ -341,20 +349,16 @@ TEST(ChainDeleteEdgesTest, Test7) {
LOG(INFO) << "after del(), edge num = " << num;
EXPECT_EQ(num, 166);

env->txnMan_->scanAll();
auto* iClient = FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED);
FakeInternalStorageClient::hookInternalStorageClient(env, iClient);
ChainResumeProcessor resumeProc(env);
resumeProc.process();
env->txnMan_->stop();
env->txnMan_->join();

LOG(INFO) << "after recover()";

num = util.checkNumOfKey(env, mockSpaceId, edgeKeys);
EXPECT_EQ(num, 166);
std::this_thread::sleep_for(std::chrono::milliseconds(300));

delete iClient;
}

// add some edges, then one all of them, rpc failure
// add some edges, delete all, rpc failure
TEST(ChainDeleteEdgesTest, Test8) {
fs::TempDir rootPath("/tmp/DeleteEdgesTest.XXXXXX");
mock::MockCluster cluster;
Expand Down Expand Up @@ -397,23 +401,26 @@ TEST(ChainDeleteEdgesTest, Test8) {
LOG(INFO) << "after del(), edge num = " << num;
EXPECT_EQ(num, 0);

env->txnMan_->scanAll();
auto* iClient = FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED);
FakeInternalStorageClient::hookInternalStorageClient(env, iClient);
ChainResumeProcessor resumeProc(env);
resumeProc.process();
// for (PartitionID i = 1; i <= partNum; ++i) {
// env->txnMan_->scanPrimes(mockSpaceId, i);
// }
UPCLT iClient(FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED));
FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get());
// ChainResumeProcessor resumeProc(env);
// resumeProc.process();

env->txnMan_->stop();
env->txnMan_->join();
num = util.checkNumOfKey(env, mockSpaceId, edgeKeys);
EXPECT_EQ(num, 0);
std::this_thread::sleep_for(std::chrono::milliseconds(300));

delete iClient;
}

} // namespace storage
} // namespace nebula

int main(int argc, char** argv) {
FLAGS_trace_toss = true;
FLAGS_v = 1;

testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, false);
Expand Down
Loading

0 comments on commit 5b963f3

Please sign in to comment.