diff --git a/src/common/utils/NebulaKeyUtils.cpp b/src/common/utils/NebulaKeyUtils.cpp index 0323269cbf0..0a91dd7b622 100644 --- a/src/common/utils/NebulaKeyUtils.cpp +++ b/src/common/utils/NebulaKeyUtils.cpp @@ -261,4 +261,8 @@ std::string NebulaKeyUtils::adminTaskKey(int32_t seqId, JobID jobId, TaskID task return key; } +std::string NebulaKeyUtils::dataVersionKey() { + return "\xFF\xFF\xFF\xFF"; +} + } // namespace nebula diff --git a/src/common/utils/NebulaKeyUtils.h b/src/common/utils/NebulaKeyUtils.h index 62767661625..5b62dde095f 100644 --- a/src/common/utils/NebulaKeyUtils.h +++ b/src/common/utils/NebulaKeyUtils.h @@ -272,6 +272,8 @@ class NebulaKeyUtils final { static std::string adminTaskKey(int32_t seqId, JobID jobId, TaskID taskId); + static std::string dataVersionKey(); + static_assert(sizeof(NebulaKeyType) == sizeof(PartitionID)); private: diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index e0b3ec19011..8d8d68e680f 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -200,7 +200,9 @@ nebula_add_executable( SetupLogging.cpp SetupBreakpad.cpp OBJECTS - $ + $ + $ + $ $ $ $ diff --git a/src/daemons/StandAloneDaemon.cpp b/src/daemons/StandAloneDaemon.cpp index 8ea8ea1fcb5..cd5c005e0ff 100644 --- a/src/daemons/StandAloneDaemon.cpp +++ b/src/daemons/StandAloneDaemon.cpp @@ -21,10 +21,11 @@ #include "common/ssl/SSLConfig.h" #include "common/time/TimezoneInfo.h" #include "common/utils/MetaKeyUtils.h" +#include "daemons/SetupLogging.h" #include "folly/ScopeGuard.h" #include "graph/service/GraphFlags.h" #include "graph/service/GraphService.h" -#include "graph/stats/StatsDef.h" +#include "graph/stats/GraphStats.h" #include "meta/MetaServiceHandler.h" #include "meta/MetaVersionMan.h" #include "meta/RootUserMan.h" @@ -32,7 +33,9 @@ #include "meta/http/MetaHttpIngestHandler.h" #include "meta/http/MetaHttpReplaceHostHandler.h" #include "meta/processors/job/JobManager.h" +#include "meta/stats/MetaStats.h" #include "storage/StorageServer.h" +#include "storage/stats/StorageStats.h" #include "version/Version.h" #include "webservice/WebService.h" @@ -50,7 +53,6 @@ void printHelp(const char *prog); void stopAllDaemon(); static void signalHandler(int sig); static Status setupSignalHandler(); -extern Status setupLogging(); #if defined(__x86_64__) extern Status setupBreakpad(); #endif @@ -105,10 +107,12 @@ int main(int argc, char *argv[]) { if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl || FLAGS_enable_meta_ssl) { folly::ssl::init(); } - nebula::initCounters(); + nebula::initGraphStats(); + nebula::initMetaStats(); + nebula::initStorageStats(); // Setup logging - auto status = setupLogging(); + auto status = setupLogging(argv[0]); if (!status.ok()) { LOG(ERROR) << status; return EXIT_FAILURE; @@ -437,7 +441,7 @@ void setupThreadManager() { int numThreads = FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads : gServer->getNumIOWorkerThreads(); std::shared_ptr threadManager( - PriorityThreadManager::newPriorityThreadManager(numThreads, false /*stats*/)); + PriorityThreadManager::newPriorityThreadManager(numThreads)); threadManager->setNamePrefix("executor"); threadManager->start(); gServer->setThreadManager(threadManager); diff --git a/src/tools/db-upgrade/CMakeLists.txt b/src/tools/db-upgrade/CMakeLists.txt index 2f4bbdbee17..22c951d3ad9 100644 --- a/src/tools/db-upgrade/CMakeLists.txt +++ b/src/tools/db-upgrade/CMakeLists.txt @@ -5,6 +5,7 @@ nebula_add_executable( DbUpgraderTool.cpp NebulaKeyUtilsV1.cpp NebulaKeyUtilsV2.cpp + NebulaKeyUtilsV3.cpp DbUpgrader.cpp OBJECTS $ diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp index e5a6e3f6ec1..a22d6d62cf6 100644 --- a/src/tools/db-upgrade/DbUpgrader.cpp +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -9,8 +9,10 @@ #include "common/fs/FileUtils.h" #include "common/utils/IndexKeyUtils.h" #include "common/utils/NebulaKeyUtils.h" +#include "rocksdb/sst_file_writer.h" #include "tools/db-upgrade/NebulaKeyUtilsV1.h" #include "tools/db-upgrade/NebulaKeyUtilsV2.h" +#include "tools/db-upgrade/NebulaKeyUtilsV3.h" DEFINE_string(src_db_path, "", @@ -22,10 +24,11 @@ DEFINE_string(dst_db_path, "multi paths should be split by comma"); DEFINE_string(upgrade_meta_server, "127.0.0.1:45500", "Meta servers' address."); DEFINE_uint32(write_batch_num, 100, "The size of the batch written to rocksdb"); -DEFINE_uint32(upgrade_version, - 0, - "When the value is 1, upgrade the data from 1.x to 2.0 GA. " - "When the value is 2, upgrade the data from 2.0 RC to 2.0 GA."); +DEFINE_string(upgrade_version, + "", + "When the value is 1:2, upgrade the data from 1.x to 2.0 GA. " + "When the value is 2RC:2, upgrade the data from 2.0 RC to 2.0 GA." + "When the value is 2:3, upgrade the data from 2.0 GA to 3.0 ."); DEFINE_bool(compactions, true, "When the upgrade of the space is completed, " @@ -83,7 +86,7 @@ Status UpgraderSpace::initSpace(const std::string& sId) { // Use readonly rocksdb readEngine_.reset(new nebula::kvstore::RocksEngine( - spaceId_, spaceVidLen_, srcPath_, "", nullptr, nullptr, true)); + spaceId_, spaceVidLen_, srcPath_, "", nullptr, nullptr, false)); writeEngine_.reset(new nebula::kvstore::RocksEngine(spaceId_, spaceVidLen_, dstPath_)); parts_.clear(); @@ -882,6 +885,114 @@ std::string UpgraderSpace::encodeRowVal(const RowReader* reader, return std::move(rowWrite).moveEncodedStr(); } +void UpgraderSpace::runPartV3() { + std::chrono::milliseconds take_dura{10}; + if (auto pId = partQueue_.try_take_for(take_dura)) { + PartitionID partId = *pId; + // Handle vertex and edge, if there is an index, generate index data + LOG(INFO) << "Start to handle vertex/edge/index data in space id " << spaceId_ << " part id " + << partId; + auto prefix = NebulaKeyUtilsV3::partTagPrefix(partId); + std::unique_ptr iter; + auto retCode = readEngine_->prefix(prefix, &iter); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Space id " << spaceId_ << " part " << partId << " no found!"; + LOG(ERROR) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " + << partId << " failed"; + + auto unFinishedPart = --unFinishedPart_; + if (unFinishedPart == 0) { + // all parts has finished + LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " + << spaceId_ << " finished"; + } else { + pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); + } + return; + } + auto write_sst = [&, this](const std::vector& data) { + ::rocksdb::Options option; + option.create_if_missing = true; + option.compression = ::rocksdb::CompressionType::kNoCompression; + ::rocksdb::SstFileWriter sst_file_writer(::rocksdb::EnvOptions(), option); + std::string file = ::fmt::format( + ".nebula_upgrade.space-{}.part-{}.{}.sst", spaceId_, partId, std::time(nullptr)); + ::rocksdb::Status s = sst_file_writer.Open(file); + if (!s.ok()) { + LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" + << s.code(); + } + for (auto item : data) { + s = sst_file_writer.Put(item.first, item.second); + if (!s.ok()) { + LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" + << s.code(); + } + } + s = sst_file_writer.Finish(); + if (!s.ok()) { + LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" + << s.code(); + } + std::lock_guard lck(this->ingest_sst_file_mut_); + ingest_sst_file_.push_back(file); + }; + std::vector data; + std::string lastVertexKey = ""; + while (iter && iter->valid()) { + auto vertex = NebulaKeyUtilsV3::getVertexKey(iter->key()); + if (vertex == lastVertexKey) { + iter->next(); + continue; + } + data.emplace_back(vertex, ""); + lastVertexKey = vertex; + if (data.size() >= 100000) { + write_sst(data); + data.clear(); + } + iter->next(); + } + if (!data.empty()) { + write_sst(data); + data.clear(); + } + LOG(INFO) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " << partId + << " succeed"; + + auto unFinishedPart = --unFinishedPart_; + if (unFinishedPart == 0) { + // all parts has finished + LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " + << spaceId_ << " finished."; + } else { + pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); + } + } else { + LOG(INFO) << "Handle vertex/edge/index of parts data in space id " << spaceId_ << " finished"; + } +} +void UpgraderSpace::doProcessV3() { + LOG(INFO) << "Start to handle data in space id " << spaceId_; + // Parallel process part + auto partConcurrency = std::min(static_cast(FLAGS_max_concurrent_parts), parts_.size()); + LOG(INFO) << "Max concurrent parts: " << partConcurrency; + unFinishedPart_ = parts_.size(); + + LOG(INFO) << "Start to handle vertex/edge/index of parts data in space id " << spaceId_; + for (size_t i = 0; i < partConcurrency; ++i) { + pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); + } + + while (unFinishedPart_ != 0) { + sleep(10); + } + auto code = readEngine_->ingest(ingest_sst_file_, true); + if (code != ::nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Faild upgrade 2:3 when ingest sst file:" << static_cast(code); + } + readEngine_->put(NebulaKeyUtils::dataVersionKey(), NebulaKeyUtilsV3::dataVersionValue()); +} std::vector UpgraderSpace::indexVertexKeys( PartitionID partId, VertexID& vId, @@ -1094,10 +1205,14 @@ void DbUpgrader::doSpace() { LOG(INFO) << "Upgrade from path " << upgraderSpaceIter->srcPath_ << " space id " << upgraderSpaceIter->entry_ << " to path " << upgraderSpaceIter->dstPath_ << " begin"; - if (FLAGS_upgrade_version == 1) { + if (FLAGS_upgrade_version == "1:2") { upgraderSpaceIter->doProcessV1(); - } else { + } else if (FLAGS_upgrade_version == "2RC:2") { upgraderSpaceIter->doProcessV2(); + } else if (FLAGS_upgrade_version == "2:3") { + upgraderSpaceIter->doProcessV3(); + } else { + LOG(FATAL) << "error upgrade version " << FLAGS_upgrade_version; } auto ret = upgraderSpaceIter->copyWal(); diff --git a/src/tools/db-upgrade/DbUpgrader.h b/src/tools/db-upgrade/DbUpgrader.h index 6851893f739..96c77dcacc0 100644 --- a/src/tools/db-upgrade/DbUpgrader.h +++ b/src/tools/db-upgrade/DbUpgrader.h @@ -23,7 +23,7 @@ DECLARE_string(src_db_path); DECLARE_string(dst_db_path); DECLARE_string(upgrade_meta_server); DECLARE_uint32(write_batch_num); -DECLARE_uint32(upgrade_version); +DECLARE_string(upgrade_version); DECLARE_bool(compactions); DECLARE_uint32(max_concurrent_parts); DECLARE_uint32(max_concurrent_spaces); @@ -55,6 +55,9 @@ class UpgraderSpace { // Processing v2 Rc data upgrade to v2 Ga void doProcessV2(); + // Processing v2 Ga data upgrade to v3 + void doProcessV3(); + // Perform manual compact void doCompaction(); @@ -111,6 +114,8 @@ class UpgraderSpace { void runPartV2(); + void runPartV3(); + public: // Source data path std::string srcPath_; @@ -159,6 +164,9 @@ class UpgraderSpace { folly::UnboundedBlockingQueue partQueue_; std::atomic unFinishedPart_; + + std::mutex ingest_sst_file_mut_; + std::vector ingest_sst_file_; }; // Upgrade one data path in storage conf diff --git a/src/tools/db-upgrade/DbUpgraderTool.cpp b/src/tools/db-upgrade/DbUpgraderTool.cpp index 94b35c51d22..f36ba943546 100644 --- a/src/tools/db-upgrade/DbUpgraderTool.cpp +++ b/src/tools/db-upgrade/DbUpgraderTool.cpp @@ -39,11 +39,12 @@ void printHelp() { A list of meta severs' ip:port separated by comma. Default: 127.0.0.1:45500 - --upgrade_version=<1|2> - This tool can only upgrade 1.x data or 2.0 RC data. - When the value is 1, upgrade the data from 1.x to 2.0 GA. - When the value is 2, upgrade the data from 2.0 RC to 2.0 GA. - Default: 0 + --upgrade_version=<1:2|2RC:2|2:3> + This tool can only upgrade 1.x data, 2.0 RC, or 2.0 GA data. + 1:2 upgrade the data from 1.x to 2.0GA + 2RC:2 upgrade the data from 2.0RC to 2.0GA + 2:3 upgrade the data from 2.0GA to 3.0 + Default: "" optional: --write_batch_num= @@ -164,9 +165,9 @@ int main(int argc, char* argv[]) { CHECK_NOTNULL(schemaMan); CHECK_NOTNULL(indexMan); - if (FLAGS_upgrade_version != 1 && FLAGS_upgrade_version != 2) { - LOG(ERROR) << "Flag upgrade_version : " << FLAGS_upgrade_version - << " illegal, upgrade_version can only be 1 or 2"; + std::vector versions = {"1:2", "2RC:2", "2:3"}; + if (std::find(versions.begin(), versions.end(), FLAGS_upgrade_version) == versions.end()) { + LOG(ERROR) << "Flag upgrade_version : " << FLAGS_upgrade_version; return EXIT_FAILURE; } LOG(INFO) << "Prepare phase end"; diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp new file mode 100644 index 00000000000..f6105094f4f --- /dev/null +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp @@ -0,0 +1,26 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "tools/db-upgrade/NebulaKeyUtilsV3.h" + +namespace nebula { +std::string NebulaKeyUtilsV3::partTagPrefix(PartitionID partId) { + PartitionID item = (partId << kPartitionOffset) | static_cast(kTag_); + std::string key; + key.reserve(sizeof(PartitionID)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)); + return key; +} +std::string NebulaKeyUtilsV3::getVertexKey(folly::StringPiece tagKey) { + std::string key = tagKey.toString(); + key[3] = static_cast(kVertex); + key.resize(key.size() - sizeof(TagID)); + return key; +} +std::string NebulaKeyUtilsV3::dataVersionValue() { + return "3.0"; +} + +} // namespace nebula diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.h b/src/tools/db-upgrade/NebulaKeyUtilsV3.h new file mode 100644 index 00000000000..55862dcd726 --- /dev/null +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.h @@ -0,0 +1,18 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ +#pragma once +#include "common/utils/Types.h" +namespace nebula { +class NebulaKeyUtilsV3 { + public: + static std::string partTagPrefix(PartitionID partId); + static std::string getVertexKey(folly::StringPiece tagKey); + static std::string dataVersionValue(); + + private: + enum NebulaKeyTypeV3 : uint32_t { kTag_ = 0x00000001, kVertex = 0x00000007 }; +}; + +} // namespace nebula