From b3c8a7091c7d5560437a0cbf314eeb033fe27b31 Mon Sep 17 00:00:00 2001 From: "darion.yaphet" Date: Mon, 7 Mar 2022 22:59:00 +0800 Subject: [PATCH] download and ingest job --- src/daemons/MetaDaemon.cpp | 4 +- src/daemons/MetaDaemonInit.cpp | 17 +- src/daemons/MetaDaemonInit.h | 5 +- src/daemons/StandAloneDaemon.cpp | 7 +- src/graph/executor/CMakeLists.txt | 2 - src/graph/executor/Executor.cpp | 8 - src/graph/executor/admin/DownloadExecutor.cpp | 33 --- src/graph/executor/admin/DownloadExecutor.h | 25 --- src/graph/executor/admin/IngestExecutor.cpp | 28 --- src/graph/executor/admin/IngestExecutor.h | 25 --- src/graph/planner/plan/Admin.h | 49 ----- src/graph/planner/plan/PlanNode.cpp | 4 - src/graph/planner/plan/PlanNode.h | 2 - src/graph/service/PermissionCheck.cpp | 4 +- src/graph/validator/AdminJobValidator.h | 6 +- src/graph/validator/CMakeLists.txt | 2 - src/graph/validator/DownloadValidator.cpp | 30 --- src/graph/validator/DownloadValidator.h | 30 --- src/graph/validator/IngestValidator.cpp | 22 -- src/graph/validator/IngestValidator.h | 29 --- src/graph/validator/Validator.cpp | 6 - src/meta/CMakeLists.txt | 7 +- src/meta/http/CMakeLists.txt | 5 +- src/meta/http/MetaHttpDownloadHandler.cpp | 205 ------------------ src/meta/http/MetaHttpDownloadHandler.h | 67 ------ src/meta/http/MetaHttpIngestHandler.cpp | 158 -------------- src/meta/http/MetaHttpIngestHandler.h | 61 ------ src/meta/http/test/CMakeLists.txt | 31 --- .../http/test/MetaHttpDownloadHandlerTest.cpp | 123 ----------- src/meta/processors/job/CompactJobExecutor.h | 2 +- .../processors/job/DataBalanceJobExecutor.h | 2 +- .../processors/job/DownloadJobExecutor.cpp | 120 ++++++++++ src/meta/processors/job/DownloadJobExecutor.h | 50 +++++ src/meta/processors/job/FlushJobExecutor.h | 2 +- src/meta/processors/job/GetStatsProcessor.h | 2 +- src/meta/processors/job/IngestJobExecutor.cpp | 70 ++++++ src/meta/processors/job/IngestJobExecutor.h | 41 ++++ src/meta/processors/job/JobDescription.h | 6 +- src/meta/processors/job/JobExecutor.cpp | 15 +- src/meta/processors/job/JobManager.cpp | 15 +- src/meta/processors/job/JobManager.h | 2 + src/meta/test/JobManagerTest.cpp | 80 ++++++- src/meta/test/MockHdfsHelper.h | 34 +-- src/parser/AdminSentences.cpp | 5 +- src/parser/AdminSentences.h | 9 - src/parser/MutateSentences.cpp | 9 - src/parser/MutateSentences.h | 69 ------ src/parser/Sentence.h | 2 - src/parser/parser.yy | 32 ++- src/parser/test/ParserTest.cpp | 6 + src/storage/CMakeLists.txt | 4 +- src/storage/StorageServer.cpp | 12 - src/storage/admin/AdminTask.cpp | 8 + src/storage/admin/AdminTask.h | 19 +- src/storage/admin/AdminTaskProcessor.h | 1 - src/storage/admin/CompactTask.cpp | 4 + src/storage/admin/CompactTask.h | 3 +- src/storage/admin/DownloadTask.cpp | 59 +++++ src/storage/admin/DownloadTask.h | 42 ++++ src/storage/admin/FlushTask.cpp | 8 +- src/storage/admin/FlushTask.h | 4 +- src/storage/admin/IngestTask.cpp | 50 +++++ src/storage/admin/IngestTask.h | 30 +++ src/storage/admin/RebuildEdgeIndexTask.cpp | 4 +- src/storage/admin/RebuildFTIndexTask.cpp | 7 +- src/storage/admin/RebuildFTIndexTask.h | 4 +- src/storage/admin/RebuildIndexTask.cpp | 8 +- src/storage/admin/RebuildIndexTask.h | 2 + src/storage/admin/StatsTask.cpp | 9 +- src/storage/admin/StatsTask.h | 3 +- src/storage/admin/StopAdminTaskProcessor.h | 1 - .../http/StorageHttpDownloadHandler.cpp | 197 ----------------- src/storage/http/StorageHttpDownloadHandler.h | 69 ------ src/storage/http/StorageHttpIngestHandler.cpp | 103 --------- src/storage/http/StorageHttpIngestHandler.h | 53 ----- src/storage/test/AdminTaskManagerTest.cpp | 7 +- src/storage/test/CMakeLists.txt | 36 +-- .../test/StorageHttpDownloadHandlerTest.cpp | 116 ---------- .../test/StorageHttpIngestHandlerTest.cpp | 104 --------- src/tools/CMakeLists.txt | 58 +++++ src/tools/db-dump/CMakeLists.txt | 58 ----- src/tools/db-upgrade/CMakeLists.txt | 55 +---- src/tools/meta-dump/CMakeLists.txt | 56 +---- src/tools/simple-kv-verify/CMakeLists.txt | 54 +---- src/tools/storage-perf/CMakeLists.txt | 61 +----- 85 files changed, 740 insertions(+), 2137 deletions(-) delete mode 100644 src/graph/executor/admin/DownloadExecutor.cpp delete mode 100644 src/graph/executor/admin/DownloadExecutor.h delete mode 100644 src/graph/executor/admin/IngestExecutor.cpp delete mode 100644 src/graph/executor/admin/IngestExecutor.h delete mode 100644 src/graph/validator/DownloadValidator.cpp delete mode 100644 src/graph/validator/DownloadValidator.h delete mode 100644 src/graph/validator/IngestValidator.cpp delete mode 100644 src/graph/validator/IngestValidator.h delete mode 100644 src/meta/http/MetaHttpDownloadHandler.cpp delete mode 100644 src/meta/http/MetaHttpDownloadHandler.h delete mode 100644 src/meta/http/MetaHttpIngestHandler.cpp delete mode 100644 src/meta/http/MetaHttpIngestHandler.h delete mode 100644 src/meta/http/test/MetaHttpDownloadHandlerTest.cpp create mode 100644 src/meta/processors/job/DownloadJobExecutor.cpp create mode 100644 src/meta/processors/job/DownloadJobExecutor.h create mode 100644 src/meta/processors/job/IngestJobExecutor.cpp create mode 100644 src/meta/processors/job/IngestJobExecutor.h create mode 100644 src/storage/admin/DownloadTask.cpp create mode 100644 src/storage/admin/DownloadTask.h create mode 100644 src/storage/admin/IngestTask.cpp create mode 100644 src/storage/admin/IngestTask.h delete mode 100644 src/storage/http/StorageHttpDownloadHandler.cpp delete mode 100644 src/storage/http/StorageHttpDownloadHandler.h delete mode 100644 src/storage/http/StorageHttpIngestHandler.cpp delete mode 100644 src/storage/http/StorageHttpIngestHandler.h delete mode 100644 src/storage/test/StorageHttpDownloadHandlerTest.cpp delete mode 100644 src/storage/test/StorageHttpIngestHandlerTest.cpp diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index c6a88cf6a7b..735a8a548db 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -26,8 +26,6 @@ #include "meta/MetaServiceHandler.h" #include "meta/MetaVersionMan.h" #include "meta/RootUserMan.h" -#include "meta/http/MetaHttpDownloadHandler.h" -#include "meta/http/MetaHttpIngestHandler.h" #include "meta/http/MetaHttpReplaceHostHandler.h" #include "meta/processors/job/JobManager.h" #include "meta/stats/MetaStats.h" @@ -152,7 +150,7 @@ int main(int argc, char* argv[]) { pool->start(FLAGS_meta_http_thread_num, "http thread pool"); auto webSvc = std::make_unique(); - status = initWebService(webSvc.get(), gKVStore.get(), helper.get(), pool.get()); + status = initWebService(webSvc.get(), gKVStore.get()); if (!status.ok()) { LOG(ERROR) << "Init web service failed: " << status; return EXIT_FAILURE; diff --git a/src/daemons/MetaDaemonInit.cpp b/src/daemons/MetaDaemonInit.cpp index 4ed4d3e3f25..03685d4502d 100644 --- a/src/daemons/MetaDaemonInit.cpp +++ b/src/daemons/MetaDaemonInit.cpp @@ -23,8 +23,6 @@ #include "meta/KVBasedClusterIdMan.h" #include "meta/MetaServiceHandler.h" #include "meta/MetaVersionMan.h" -#include "meta/http/MetaHttpDownloadHandler.h" -#include "meta/http/MetaHttpIngestHandler.h" #include "meta/http/MetaHttpReplaceHostHandler.h" #include "meta/processors/job/JobManager.h" #include "meta/stats/MetaStats.h" @@ -160,22 +158,9 @@ std::unique_ptr initKV(std::vector p return kvstore; } -nebula::Status initWebService(nebula::WebService* svc, - nebula::kvstore::KVStore* kvstore, - nebula::hdfs::HdfsCommandHelper* helper, - nebula::thread::GenericThreadPool* pool) { +nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore) { LOG(INFO) << "Starting Meta HTTP Service"; auto& router = svc->router(); - router.get("/download-dispatch").handler([kvstore, helper, pool](PathParams&&) { - auto handler = new nebula::meta::MetaHttpDownloadHandler(); - handler->init(kvstore, helper, pool); - return handler; - }); - router.get("/ingest-dispatch").handler([kvstore, pool](PathParams&&) { - auto handler = new nebula::meta::MetaHttpIngestHandler(); - handler->init(kvstore, pool); - return handler; - }); router.get("/replace").handler([kvstore](PathParams&&) { auto handler = new nebula::meta::MetaHttpReplaceHostHandler(); handler->init(kvstore); diff --git a/src/daemons/MetaDaemonInit.h b/src/daemons/MetaDaemonInit.h index 3d40ded967e..0a94ae4bbd4 100644 --- a/src/daemons/MetaDaemonInit.h +++ b/src/daemons/MetaDaemonInit.h @@ -17,8 +17,5 @@ nebula::ClusterID& metaClusterId(); std::unique_ptr initKV(std::vector peers, nebula::HostAddr localhost); -nebula::Status initWebService(nebula::WebService* svc, - nebula::kvstore::KVStore* kvstore, - nebula::hdfs::HdfsCommandHelper* helper, - nebula::thread::GenericThreadPool* pool); +nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore); #endif diff --git a/src/daemons/StandAloneDaemon.cpp b/src/daemons/StandAloneDaemon.cpp index 1e44e8a47bc..2f36a4f98cd 100644 --- a/src/daemons/StandAloneDaemon.cpp +++ b/src/daemons/StandAloneDaemon.cpp @@ -29,8 +29,6 @@ #include "meta/MetaServiceHandler.h" #include "meta/MetaVersionMan.h" #include "meta/RootUserMan.h" -#include "meta/http/MetaHttpDownloadHandler.h" -#include "meta/http/MetaHttpIngestHandler.h" #include "meta/http/MetaHttpReplaceHostHandler.h" #include "meta/processors/job/JobManager.h" #include "meta/stats/MetaStats.h" @@ -209,11 +207,8 @@ int main(int argc, char *argv[]) { return; } LOG(INFO) << "Start http service"; - auto helper = std::make_unique(); - auto pool = std::make_unique(); - pool->start(FLAGS_meta_http_thread_num, "http thread pool"); auto webSvc = std::make_unique(); - status = initWebService(webSvc.get(), gMetaKVStore.get(), helper.get(), pool.get()); + status = initWebService(webSvc.get(), gMetaKVStore.get()); if (!status.ok()) { LOG(ERROR) << "Init web service failed: " << status; return; diff --git a/src/graph/executor/CMakeLists.txt b/src/graph/executor/CMakeLists.txt index 2bb009d79bb..40f815b6379 100644 --- a/src/graph/executor/CMakeLists.txt +++ b/src/graph/executor/CMakeLists.txt @@ -68,8 +68,6 @@ nebula_add_library( admin/PartExecutor.cpp admin/CharsetExecutor.cpp admin/ShowStatsExecutor.cpp - admin/DownloadExecutor.cpp - admin/IngestExecutor.cpp admin/ConfigExecutor.cpp admin/ZoneExecutor.cpp admin/ShowServiceClientsExecutor.cpp diff --git a/src/graph/executor/Executor.cpp b/src/graph/executor/Executor.cpp index 5125de41377..27e24966d83 100644 --- a/src/graph/executor/Executor.cpp +++ b/src/graph/executor/Executor.cpp @@ -23,11 +23,9 @@ #include "graph/executor/admin/ConfigExecutor.h" #include "graph/executor/admin/CreateUserExecutor.h" #include "graph/executor/admin/DescribeUserExecutor.h" -#include "graph/executor/admin/DownloadExecutor.h" #include "graph/executor/admin/DropHostsExecutor.h" #include "graph/executor/admin/DropUserExecutor.h" #include "graph/executor/admin/GrantRoleExecutor.h" -#include "graph/executor/admin/IngestExecutor.h" #include "graph/executor/admin/KillQueryExecutor.h" #include "graph/executor/admin/ListRolesExecutor.h" #include "graph/executor/admin/ListUserRolesExecutor.h" @@ -518,12 +516,6 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) { case PlanNode::Kind::kSignOutService: { return pool->add(new SignOutServiceExecutor(node, qctx)); } - case PlanNode::Kind::kDownload: { - return pool->add(new DownloadExecutor(node, qctx)); - } - case PlanNode::Kind::kIngest: { - return pool->add(new IngestExecutor(node, qctx)); - } case PlanNode::Kind::kShowSessions: { return pool->add(new ShowSessionsExecutor(node, qctx)); } diff --git a/src/graph/executor/admin/DownloadExecutor.cpp b/src/graph/executor/admin/DownloadExecutor.cpp deleted file mode 100644 index 94e44b1d897..00000000000 --- a/src/graph/executor/admin/DownloadExecutor.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "graph/executor/admin/DownloadExecutor.h" - -#include "graph/context/QueryContext.h" -#include "graph/planner/plan/Admin.h" - -namespace nebula { -namespace graph { - -folly::Future DownloadExecutor::execute() { - SCOPED_TIMER(&execTime_); - auto *dNode = asNode(node()); - auto spaceId = qctx()->rctx()->session()->space().id; - return qctx() - ->getMetaClient() - ->download(dNode->getHdfsHost(), dNode->getHdfsPort(), dNode->getHdfsPath(), spaceId) - .via(runner()) - .thenValue([this](StatusOr resp) { - SCOPED_TIMER(&execTime_); - NG_RETURN_IF_ERROR(resp); - if (!resp.value()) { - return Status::Error("Download failed!"); - } - return Status::OK(); - }); -} - -} // namespace graph -} // namespace nebula diff --git a/src/graph/executor/admin/DownloadExecutor.h b/src/graph/executor/admin/DownloadExecutor.h deleted file mode 100644 index c0912c9aedf..00000000000 --- a/src/graph/executor/admin/DownloadExecutor.h +++ /dev/null @@ -1,25 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef GRAPH_EXECUTOR_ADMIN_DOWNLOADEXECUTOR_H_ -#define GRAPH_EXECUTOR_ADMIN_DOWNLOADEXECUTOR_H_ - -#include "graph/executor/Executor.h" - -namespace nebula { -namespace graph { - -class DownloadExecutor final : public Executor { - public: - DownloadExecutor(const PlanNode *node, QueryContext *qctx) - : Executor("DownloadExecutor", node, qctx) {} - - folly::Future execute() override; -}; - -} // namespace graph -} // namespace nebula - -#endif // GRAPH_EXECUTOR_ADMIN_DOWNLOADEXECUTOR_H_ diff --git a/src/graph/executor/admin/IngestExecutor.cpp b/src/graph/executor/admin/IngestExecutor.cpp deleted file mode 100644 index 9cb397e3006..00000000000 --- a/src/graph/executor/admin/IngestExecutor.cpp +++ /dev/null @@ -1,28 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "graph/executor/admin/IngestExecutor.h" - -#include "graph/context/QueryContext.h" -#include "graph/planner/plan/Admin.h" - -namespace nebula { -namespace graph { - -folly::Future IngestExecutor::execute() { - auto spaceId = qctx()->rctx()->session()->space().id; - return qctx()->getMetaClient()->ingest(spaceId).via(runner()).thenValue( - [this](StatusOr resp) { - SCOPED_TIMER(&execTime_); - NG_RETURN_IF_ERROR(resp); - if (!resp.value()) { - return Status::Error("Ingest failed!"); - } - return Status::OK(); - }); -} - -} // namespace graph -} // namespace nebula diff --git a/src/graph/executor/admin/IngestExecutor.h b/src/graph/executor/admin/IngestExecutor.h deleted file mode 100644 index 8a84c1dbf67..00000000000 --- a/src/graph/executor/admin/IngestExecutor.h +++ /dev/null @@ -1,25 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef GRAPH_EXECUTOR_ADMIN_INGESTEXECUTOR_H_ -#define GRAPH_EXECUTOR_ADMIN_INGESTEXECUTOR_H_ - -#include "graph/executor/Executor.h" - -namespace nebula { -namespace graph { - -class IngestExecutor final : public Executor { - public: - IngestExecutor(const PlanNode *node, QueryContext *qctx) - : Executor("IngestExecutor", node, qctx) {} - - folly::Future execute() override; -}; - -} // namespace graph -} // namespace nebula - -#endif // GRAPH_EXECUTOR_ADMIN_INGESTEXECUTOR_H_ diff --git a/src/graph/planner/plan/Admin.h b/src/graph/planner/plan/Admin.h index cc5973286d9..3b390a47f16 100644 --- a/src/graph/planner/plan/Admin.h +++ b/src/graph/planner/plan/Admin.h @@ -537,55 +537,6 @@ class ShowListener final : public SingleDependencyNode { : SingleDependencyNode(qctx, Kind::kShowListener, input) {} }; -class Download final : public SingleDependencyNode { - public: - static Download* make(QueryContext* qctx, - PlanNode* input, - std::string hdfsHost, - int32_t hdfsPort, - std::string hdfsPath) { - return qctx->objPool()->add(new Download(qctx, input, hdfsHost, hdfsPort, hdfsPath)); - } - - const std::string& getHdfsHost() const { - return hdfsHost_; - } - - int32_t getHdfsPort() const { - return hdfsPort_; - } - - const std::string& getHdfsPath() const { - return hdfsPath_; - } - - private: - Download(QueryContext* qctx, - PlanNode* dep, - std::string hdfsHost, - int32_t hdfsPort, - std::string hdfsPath) - : SingleDependencyNode(qctx, Kind::kDownload, dep), - hdfsHost_(hdfsHost), - hdfsPort_(hdfsPort), - hdfsPath_(hdfsPath) {} - - private: - std::string hdfsHost_; - int32_t hdfsPort_; - std::string hdfsPath_; -}; - -class Ingest final : public SingleDependencyNode { - public: - static Ingest* make(QueryContext* qctx, PlanNode* dep) { - return qctx->objPool()->add(new Ingest(qctx, dep)); - } - - private: - Ingest(QueryContext* qctx, PlanNode* dep) : SingleDependencyNode(qctx, Kind::kIngest, dep) {} -}; - // User related Node class CreateUser final : public CreateNode { public: diff --git a/src/graph/planner/plan/PlanNode.cpp b/src/graph/planner/plan/PlanNode.cpp index dfb4978efd0..8ceade20b92 100644 --- a/src/graph/planner/plan/PlanNode.cpp +++ b/src/graph/planner/plan/PlanNode.cpp @@ -277,10 +277,6 @@ const char* PlanNode::toString(PlanNode::Kind kind) { return "SignInService"; case Kind::kSignOutService: return "SignOutService"; - case Kind::kDownload: - return "Download"; - case Kind::kIngest: - return "Ingest"; case Kind::kShowSessions: return "ShowSessions"; case Kind::kUpdateSession: diff --git a/src/graph/planner/plan/PlanNode.h b/src/graph/planner/plan/PlanNode.h index beff38e0aa9..9c1e49b7cd9 100644 --- a/src/graph/planner/plan/PlanNode.h +++ b/src/graph/planner/plan/PlanNode.h @@ -175,8 +175,6 @@ class PlanNode { kShowFTIndexes, kSignInService, kSignOutService, - kDownload, - kIngest, kShowSessions, kUpdateSession, diff --git a/src/graph/service/PermissionCheck.cpp b/src/graph/service/PermissionCheck.cpp index 2ff0475b27a..8939a249ae5 100644 --- a/src/graph/service/PermissionCheck.cpp +++ b/src/graph/service/PermissionCheck.cpp @@ -11,7 +11,7 @@ namespace graph { /** * Read space : kUse, kDescribeSpace * Write space : kCreateSpace, kDropSpace, kClearSpace, kCreateSnapshot, - * kDropSnapshot, kBalance, kAdmin, kConfig, kIngest, kDownload + * kDropSnapshot, kBalance, kAdmin, kConfig * Read schema : kDescribeTag, kDescribeEdge, * kDescribeTagIndex, kDescribeEdgeIndex * Write schema : kCreateTag, kAlterTag, kCreateEdge, @@ -68,8 +68,6 @@ namespace graph { case Sentence::Kind::kShowConfigs: case Sentence::Kind::kSetConfig: case Sentence::Kind::kGetConfig: - case Sentence::Kind::kIngest: - case Sentence::Kind::kDownload: case Sentence::Kind::kSignOutService: case Sentence::Kind::kSignInService: { return PermissionManager::canWriteSpace(session); diff --git a/src/graph/validator/AdminJobValidator.h b/src/graph/validator/AdminJobValidator.h index e6930fa9ff6..65d9df8b869 100644 --- a/src/graph/validator/AdminJobValidator.h +++ b/src/graph/validator/AdminJobValidator.h @@ -37,14 +37,12 @@ class AdminJobValidator final : public Validator { case meta::cpp2::JobType::STATS: case meta::cpp2::JobType::COMPACT: case meta::cpp2::JobType::FLUSH: + case meta::cpp2::JobType::DOWNLOAD: + case meta::cpp2::JobType::INGEST: case meta::cpp2::JobType::DATA_BALANCE: case meta::cpp2::JobType::LEADER_BALANCE: case meta::cpp2::JobType::ZONE_BALANCE: return true; - // TODO: download and ingest need to be refactored to use the rpc protocol. - // Currently they are using their own validator - case meta::cpp2::JobType::DOWNLOAD: - case meta::cpp2::JobType::INGEST: case meta::cpp2::JobType::UNKNOWN: return false; } diff --git a/src/graph/validator/CMakeLists.txt b/src/graph/validator/CMakeLists.txt index 722f3725a36..ac7dd18981e 100644 --- a/src/graph/validator/CMakeLists.txt +++ b/src/graph/validator/CMakeLists.txt @@ -27,8 +27,6 @@ nebula_add_library( FindPathValidator.cpp LookupValidator.cpp MatchValidator.cpp - DownloadValidator.cpp - IngestValidator.cpp ) nebula_add_subdirectory(test) diff --git a/src/graph/validator/DownloadValidator.cpp b/src/graph/validator/DownloadValidator.cpp deleted file mode 100644 index 73b4797ec5a..00000000000 --- a/src/graph/validator/DownloadValidator.cpp +++ /dev/null @@ -1,30 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "graph/validator/DownloadValidator.h" - -#include "graph/planner/plan/Admin.h" -#include "parser/MutateSentences.h" - -namespace nebula { -namespace graph { - -// Plan to download SST file from HDFS -Status DownloadValidator::toPlan() { - auto sentence = static_cast(sentence_); - if (sentence->host() == nullptr || sentence->port() == 0 || sentence->path() == nullptr) { - return Status::SemanticError( - "HDFS path illegal." - "Should be HDFS://${HDFS_HOST}:${HDFS_PORT}/${HDFS_PATH}"); - } - auto *doNode = - Download::make(qctx_, nullptr, *sentence->host(), sentence->port(), *sentence->path()); - root_ = doNode; - tail_ = root_; - return Status::OK(); -} - -} // namespace graph -} // namespace nebula diff --git a/src/graph/validator/DownloadValidator.h b/src/graph/validator/DownloadValidator.h deleted file mode 100644 index 27ff4bab89a..00000000000 --- a/src/graph/validator/DownloadValidator.h +++ /dev/null @@ -1,30 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef GRAPH_VALIDATOR_DOWNLOADVALIDATOR_H_ -#define GRAPH_VALIDATOR_DOWNLOADVALIDATOR_H_ - -#include "graph/validator/Validator.h" -#include "parser/AdminSentences.h" - -namespace nebula { -namespace graph { - -class DownloadValidator final : public Validator { - public: - DownloadValidator(Sentence* sentence, QueryContext* context) : Validator(sentence, context) {} - - private: - Status validateImpl() override { - return Status::OK(); - } - - Status toPlan() override; -}; - -} // namespace graph -} // namespace nebula - -#endif // GRAPH_VALIDATOR_DOWNLOADVALIDATOR_H_ diff --git a/src/graph/validator/IngestValidator.cpp b/src/graph/validator/IngestValidator.cpp deleted file mode 100644 index 6798e4271f0..00000000000 --- a/src/graph/validator/IngestValidator.cpp +++ /dev/null @@ -1,22 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "graph/validator/IngestValidator.h" - -#include "graph/planner/plan/Admin.h" - -namespace nebula { -namespace graph { - -// Plan to ingest SST file in server side -Status IngestValidator::toPlan() { - auto *doNode = Ingest::make(qctx_, nullptr); - root_ = doNode; - tail_ = root_; - return Status::OK(); -} - -} // namespace graph -} // namespace nebula diff --git a/src/graph/validator/IngestValidator.h b/src/graph/validator/IngestValidator.h deleted file mode 100644 index eb560626963..00000000000 --- a/src/graph/validator/IngestValidator.h +++ /dev/null @@ -1,29 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef GRAPH_VALIDATOR_INGESTVALIDATOR_H_ -#define GRAPH_VALIDATOR_INGESTVALIDATOR_H_ - -#include "graph/validator/Validator.h" - -namespace nebula { -namespace graph { - -class IngestValidator final : public Validator { - public: - IngestValidator(Sentence* sentence, QueryContext* context) : Validator(sentence, context) {} - - private: - Status validateImpl() override { - return Status::OK(); - } - - Status toPlan() override; -}; - -} // namespace graph -} // namespace nebula - -#endif // GRAPH_VALIDATOR_INGESTVALIDATOR_H_ diff --git a/src/graph/validator/Validator.cpp b/src/graph/validator/Validator.cpp index d87ea35bec7..58330d75132 100644 --- a/src/graph/validator/Validator.cpp +++ b/src/graph/validator/Validator.cpp @@ -16,7 +16,6 @@ #include "graph/validator/AdminJobValidator.h" #include "graph/validator/AdminValidator.h" #include "graph/validator/AssignmentValidator.h" -#include "graph/validator/DownloadValidator.h" #include "graph/validator/ExplainValidator.h" #include "graph/validator/FetchEdgesValidator.h" #include "graph/validator/FetchVerticesValidator.h" @@ -24,7 +23,6 @@ #include "graph/validator/GetSubgraphValidator.h" #include "graph/validator/GoValidator.h" #include "graph/validator/GroupByValidator.h" -#include "graph/validator/IngestValidator.h" #include "graph/validator/LimitValidator.h" #include "graph/validator/LookupValidator.h" #include "graph/validator/MaintainValidator.h" @@ -238,10 +236,6 @@ std::unique_ptr Validator::makeValidator(Sentence* sentence, QueryCon return std::make_unique(sentence, context); case Sentence::Kind::kSignOutService: return std::make_unique(sentence, context); - case Sentence::Kind::kDownload: - return std::make_unique(sentence, context); - case Sentence::Kind::kIngest: - return std::make_unique(sentence, context); case Sentence::Kind::kCreateFTIndex: return std::make_unique(sentence, context); case Sentence::Kind::kDropFTIndex: diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 1a16db69c6e..d805bdbd075 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -45,8 +45,6 @@ nebula_add_library( processors/admin/CreateSnapshotProcessor.cpp processors/admin/DropSnapshotProcessor.cpp processors/admin/ListSnapshotsProcessor.cpp - processors/job/BalancePlan.cpp - processors/job/BalanceTask.cpp processors/admin/AdminClient.cpp processors/admin/SnapShot.cpp processors/admin/CreateBackupProcessor.cpp @@ -70,6 +68,10 @@ nebula_add_library( processors/job/SimpleConcurrentJobExecutor.cpp processors/job/CompactJobExecutor.cpp processors/job/FlushJobExecutor.cpp + processors/job/DownloadJobExecutor.cpp + processors/job/IngestJobExecutor.cpp + processors/job/BalancePlan.cpp + processors/job/BalanceTask.cpp processors/job/BalanceJobExecutor.cpp processors/job/ZoneBalanceJobExecutor.cpp processors/job/DataBalanceJobExecutor.cpp @@ -112,6 +114,7 @@ add_dependencies( ) set(meta_test_deps + $ $ $ $ diff --git a/src/meta/http/CMakeLists.txt b/src/meta/http/CMakeLists.txt index 34d2e95b4fe..c346be8d85d 100644 --- a/src/meta/http/CMakeLists.txt +++ b/src/meta/http/CMakeLists.txt @@ -2,12 +2,9 @@ # # This source code is licensed under Apache 2.0 License. - nebula_add_library( meta_http_handler OBJECT - MetaHttpIngestHandler.cpp - MetaHttpDownloadHandler.cpp MetaHttpReplaceHostHandler.cpp - ) +) nebula_add_subdirectory(test) diff --git a/src/meta/http/MetaHttpDownloadHandler.cpp b/src/meta/http/MetaHttpDownloadHandler.cpp deleted file mode 100644 index 6c88e6927db..00000000000 --- a/src/meta/http/MetaHttpDownloadHandler.cpp +++ /dev/null @@ -1,205 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "meta/http/MetaHttpDownloadHandler.h" - -#include -#include -#include -#include -#include - -#include "common/hdfs/HdfsHelper.h" -#include "common/http/HttpClient.h" -#include "common/network/NetworkUtils.h" -#include "common/process/ProcessUtils.h" -#include "common/thread/GenericThreadPool.h" -#include "common/utils/MetaKeyUtils.h" -#include "webservice/Common.h" -#include "webservice/WebService.h" - -DECLARE_int32(ws_storage_http_port); - -namespace nebula { -namespace meta { - -using proxygen::HTTPMessage; -using proxygen::HTTPMethod; -using proxygen::ProxygenError; -using proxygen::ResponseBuilder; -using proxygen::UpgradeProtocol; - -void MetaHttpDownloadHandler::init(nebula::kvstore::KVStore *kvstore, - nebula::hdfs::HdfsHelper *helper, - nebula::thread::GenericThreadPool *pool) { - kvstore_ = kvstore; - helper_ = helper; - pool_ = pool; - CHECK_NOTNULL(kvstore_); - CHECK_NOTNULL(helper_); - CHECK_NOTNULL(pool_); -} - -void MetaHttpDownloadHandler::onRequest(std::unique_ptr headers) noexcept { - if (headers->getMethod().value() != HTTPMethod::GET) { - // Unsupported method - err_ = HttpCode::E_UNSUPPORTED_METHOD; - return; - } - - if (!headers->hasQueryParam("host") || !headers->hasQueryParam("port") || - !headers->hasQueryParam("path") || !headers->hasQueryParam("space")) { - LOG(INFO) << "Illegal Argument"; - err_ = HttpCode::E_ILLEGAL_ARGUMENT; - return; - } - - hdfsHost_ = headers->getQueryParam("host"); - hdfsPort_ = headers->getIntQueryParam("port"); - hdfsPath_ = headers->getQueryParam("path"); - spaceID_ = headers->getIntQueryParam("space"); -} - -void MetaHttpDownloadHandler::onBody(std::unique_ptr) noexcept { - // Do nothing, we only support GET -} - -void MetaHttpDownloadHandler::onEOM() noexcept { - switch (err_) { - case HttpCode::E_UNSUPPORTED_METHOD: - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), - WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) - .sendWithEOM(); - return; - case HttpCode::E_ILLEGAL_ARGUMENT: - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::BAD_REQUEST), - WebServiceUtils::toString(HttpStatusCode::BAD_REQUEST)) - .sendWithEOM(); - return; - default: - break; - } - - if (helper_->checkHadoopPath()) { - if (dispatchSSTFiles(hdfsHost_, hdfsPort_, hdfsPath_)) { - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::OK), - WebServiceUtils::toString(HttpStatusCode::OK)) - .body("SSTFile dispatch successfully") - .sendWithEOM(); - } else { - LOG(INFO) << "SSTFile dispatch failed"; - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::FORBIDDEN), - WebServiceUtils::toString(HttpStatusCode::FORBIDDEN)) - .body("SSTFile dispatch failed") - .sendWithEOM(); - } - } else { - LOG(INFO) << "Hadoop Home not exist"; - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::NOT_FOUND), - WebServiceUtils::toString(HttpStatusCode::NOT_FOUND)) - .sendWithEOM(); - } -} - -void MetaHttpDownloadHandler::onUpgrade(UpgradeProtocol) noexcept { - // Do nothing -} - -void MetaHttpDownloadHandler::requestComplete() noexcept { - delete this; -} - -void MetaHttpDownloadHandler::onError(ProxygenError error) noexcept { - LOG(INFO) << "Web Service MetaHttpDownloadHandler got error : " - << proxygen::getErrorString(error); -} - -bool MetaHttpDownloadHandler::dispatchSSTFiles(const std::string &hdfsHost, - int hdfsPort, - const std::string &hdfsPath) { - auto result = helper_->ls(hdfsHost, hdfsPort, hdfsPath); - if (!result.ok()) { - LOG(INFO) << "Dispatch SSTFile Failed"; - return false; - } - std::vector files; - folly::split("\n", result.value(), files, true); - - std::unique_ptr iter; - auto prefix = MetaKeyUtils::partPrefix(spaceID_); - auto ret = kvstore_->prefix(0, 0, prefix, &iter); - if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "Fetch Parts Failed"; - return false; - } - - int32_t partSize{0}; - std::unordered_map> hostPartition; - while (iter->valid()) { - auto key = iter->key(); - PartitionID partId; - memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID)); - for (auto host : MetaKeyUtils::parsePartVal(iter->val())) { - auto address = HostAddr(host.host, host.port); - auto addressIter = hostPartition.find(address); - if (addressIter == hostPartition.end()) { - std::vector partitions; - hostPartition.emplace(address, partitions); - } - hostPartition[address].emplace_back(partId); - } - partSize++; - iter->next(); - } - - std::vector> futures; - for (auto &pair : hostPartition) { - std::string partsStr; - folly::join(",", pair.second, partsStr); - - auto hostaddr = pair.first.host; - auto dispatcher = [hostaddr, hdfsHost, hdfsPort, hdfsPath, partsStr, this]() { - static const char *tmp = "http://%s:%d/%s?host=%s&port=%d&path=%s&parts=%s&space=%d"; - std::string url = folly::stringPrintf(tmp, - hostaddr.c_str(), - FLAGS_ws_storage_http_port, - "download", - hdfsHost.c_str(), - hdfsPort, - hdfsPath.c_str(), - partsStr.c_str(), - spaceID_); - auto downloadResult = nebula::http::HttpClient::get(url); - return downloadResult.ok() && downloadResult.value() == "SSTFile download successfully"; - }; - auto future = pool_->addTask(dispatcher); - futures.push_back(std::move(future)); - } - - bool successfully{true}; - auto tries = folly::collectAll(std::move(futures)).get(); - for (const auto &t : tries) { - if (t.hasException()) { - LOG(INFO) << "Download Failed: " << t.exception(); - successfully = false; - break; - } - if (!t.value()) { - successfully = false; - break; - } - } - - LOG(INFO) << "Download tasks have finished"; - return successfully; -} - -} // namespace meta -} // namespace nebula diff --git a/src/meta/http/MetaHttpDownloadHandler.h b/src/meta/http/MetaHttpDownloadHandler.h deleted file mode 100644 index 77ee273c686..00000000000 --- a/src/meta/http/MetaHttpDownloadHandler.h +++ /dev/null @@ -1,67 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef META_HTTP_METAHTTPDOWNLOADHANDLER_H_ -#define META_HTTP_METAHTTPDOWNLOADHANDLER_H_ - -#include - -#include "common/base/Base.h" -#include "common/hdfs/HdfsHelper.h" -#include "common/thread/GenericThreadPool.h" -#include "kvstore/KVStore.h" -#include "webservice/Common.h" - -namespace nebula { -namespace meta { - -using nebula::HttpCode; - -/** - * @brief Download sst files from hdfs to every storaged download folder. - * It will send download http request to every storaged, letting them - * download the corressponding sst files. - * Functions such as onRequest, onBody... and requestComplete are inherited - * from RequestHandler, we will check request parameters in onRequest and - * call main logic in onEOM. - */ -class MetaHttpDownloadHandler : public proxygen::RequestHandler { - public: - MetaHttpDownloadHandler() = default; - - void init(nebula::kvstore::KVStore *kvstore, - nebula::hdfs::HdfsHelper *helper, - nebula::thread::GenericThreadPool *pool); - - void onRequest(std::unique_ptr headers) noexcept override; - - void onBody(std::unique_ptr body) noexcept override; - - void onEOM() noexcept override; - - void onUpgrade(proxygen::UpgradeProtocol protocol) noexcept override; - - void requestComplete() noexcept override; - - void onError(proxygen::ProxygenError error) noexcept override; - - private: - bool dispatchSSTFiles(const std::string &host, int32_t port, const std::string &path); - - private: - HttpCode err_{HttpCode::SUCCEEDED}; - std::string hdfsHost_; - int32_t hdfsPort_; - std::string hdfsPath_; - GraphSpaceID spaceID_; - nebula::kvstore::KVStore *kvstore_; - nebula::hdfs::HdfsHelper *helper_; - nebula::thread::GenericThreadPool *pool_; -}; - -} // namespace meta -} // namespace nebula - -#endif // META_HTTP_METAHTTPDOWNLOADHANDLER_H_ diff --git a/src/meta/http/MetaHttpIngestHandler.cpp b/src/meta/http/MetaHttpIngestHandler.cpp deleted file mode 100644 index 27f76a890c3..00000000000 --- a/src/meta/http/MetaHttpIngestHandler.cpp +++ /dev/null @@ -1,158 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "meta/http/MetaHttpIngestHandler.h" - -#include -#include -#include - -#include "common/http/HttpClient.h" -#include "common/network/NetworkUtils.h" -#include "common/process/ProcessUtils.h" -#include "common/thread/GenericThreadPool.h" -#include "common/utils/MetaKeyUtils.h" -#include "webservice/Common.h" -#include "webservice/WebService.h" - -DECLARE_int32(ws_storage_http_port); - -namespace nebula { -namespace meta { - -using proxygen::HTTPMessage; -using proxygen::HTTPMethod; -using proxygen::ProxygenError; -using proxygen::ResponseBuilder; -using proxygen::UpgradeProtocol; - -void MetaHttpIngestHandler::init(nebula::kvstore::KVStore *kvstore, - nebula::thread::GenericThreadPool *pool) { - kvstore_ = kvstore; - pool_ = pool; - CHECK_NOTNULL(kvstore_); - CHECK_NOTNULL(pool_); -} - -void MetaHttpIngestHandler::onRequest(std::unique_ptr headers) noexcept { - if (headers->getMethod().value() != HTTPMethod::GET) { - // Unsupported method - err_ = HttpCode::E_UNSUPPORTED_METHOD; - return; - } - - if (!headers->hasQueryParam("space")) { - err_ = HttpCode::E_ILLEGAL_ARGUMENT; - return; - } - - space_ = headers->getIntQueryParam("space"); -} - -void MetaHttpIngestHandler::onBody(std::unique_ptr) noexcept { - // Do nothing, we only support GET -} - -void MetaHttpIngestHandler::onEOM() noexcept { - switch (err_) { - case HttpCode::E_UNSUPPORTED_METHOD: - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), - WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) - .sendWithEOM(); - return; - case HttpCode::E_ILLEGAL_ARGUMENT: - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::BAD_REQUEST), - WebServiceUtils::toString(HttpStatusCode::BAD_REQUEST)) - .sendWithEOM(); - return; - default: - break; - } - - if (ingestSSTFiles(space_)) { - LOG(INFO) << "SSTFile ingest successfully "; - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::OK), - WebServiceUtils::toString(HttpStatusCode::OK)) - .body("SSTFile ingest successfully") - .sendWithEOM(); - } else { - LOG(INFO) << "SSTFile ingest failed"; - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::FORBIDDEN), - WebServiceUtils::toString(HttpStatusCode::FORBIDDEN)) - .body("SSTFile ingest failed") - .sendWithEOM(); - } -} - -void MetaHttpIngestHandler::onUpgrade(UpgradeProtocol) noexcept { - // Do nothing -} - -void MetaHttpIngestHandler::requestComplete() noexcept { - delete this; -} - -void MetaHttpIngestHandler::onError(ProxygenError error) noexcept { - LOG(INFO) << "Web Service MetaHttpIngestHandler got error : " << proxygen::getErrorString(error); -} - -bool MetaHttpIngestHandler::ingestSSTFiles(GraphSpaceID space) { - std::unique_ptr iter; - auto prefix = MetaKeyUtils::partPrefix(space); - - static const GraphSpaceID metaSpaceId = 0; - static const PartitionID metaPartId = 0; - auto ret = kvstore_->prefix(metaSpaceId, metaPartId, prefix, &iter); - if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "Fetch Parts Failed"; - return false; - } - - std::set storageIPs; - while (iter->valid()) { - for (auto &host : MetaKeyUtils::parsePartVal(iter->val())) { - if (storageIPs.count(host.host) == 0) { - storageIPs.insert(std::move(host.host)); - } - } - iter->next(); - } - - std::vector> futures; - - for (auto &storageIP : storageIPs) { - auto dispatcher = [storageIP, space]() { - static const char *tmp = "http://%s:%d/ingest?space=%d"; - auto url = folly::stringPrintf(tmp, storageIP.c_str(), FLAGS_ws_storage_http_port, space); - auto ingestResult = nebula::http::HttpClient::get(url); - return ingestResult.ok() && ingestResult.value() == "SSTFile ingest successfully"; - }; - auto future = pool_->addTask(dispatcher); - futures.push_back(std::move(future)); - } - - bool successfully{true}; - auto tries = folly::collectAll(std::move(futures)).get(); - for (const auto &t : tries) { - if (t.hasException()) { - LOG(INFO) << "Ingest Failed: " << t.exception(); - successfully = false; - break; - } - if (!t.value()) { - successfully = false; - break; - } - } - LOG(INFO) << "Ingest tasks have finished"; - return successfully; -} - -} // namespace meta -} // namespace nebula diff --git a/src/meta/http/MetaHttpIngestHandler.h b/src/meta/http/MetaHttpIngestHandler.h deleted file mode 100644 index acddf4affc8..00000000000 --- a/src/meta/http/MetaHttpIngestHandler.h +++ /dev/null @@ -1,61 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef META_HTTP_METAHTTPINGESTHANDLER_H -#define META_HTTP_METAHTTPINGESTHANDLER_H - -#include - -#include "common/base/Base.h" -#include "common/thread/GenericThreadPool.h" -#include "kvstore/KVStore.h" -#include "webservice/Common.h" - -namespace nebula { -namespace meta { - -using nebula::HttpCode; - -/** - * @brief Ingest should be called after download successfully. - * It will instruct relative storaged to ingest sst files - * from local download folder by sending http request. - * It will handle one space each time. - * Functions such as onRequest, onBody... and requestComplete are inherited - * from RequestHandler, we will check request parameters in onRequest and - * call main logic in onEOM. - * - */ -class MetaHttpIngestHandler : public proxygen::RequestHandler { - public: - MetaHttpIngestHandler() = default; - - void init(nebula::kvstore::KVStore *kvstore, nebula::thread::GenericThreadPool *pool); - - void onRequest(std::unique_ptr headers) noexcept override; - - void onBody(std::unique_ptr body) noexcept override; - - void onEOM() noexcept override; - - void onUpgrade(proxygen::UpgradeProtocol protocol) noexcept override; - - void requestComplete() noexcept override; - - void onError(proxygen::ProxygenError error) noexcept override; - - bool ingestSSTFiles(GraphSpaceID space); - - private: - HttpCode err_{HttpCode::SUCCEEDED}; - GraphSpaceID space_; - nebula::kvstore::KVStore *kvstore_; - nebula::thread::GenericThreadPool *pool_; -}; - -} // namespace meta -} // namespace nebula - -#endif // META_HTTP_METAHTTPINGESTHANDLER_H diff --git a/src/meta/http/test/CMakeLists.txt b/src/meta/http/test/CMakeLists.txt index 9f116aefe5e..c4d059fb41b 100644 --- a/src/meta/http/test/CMakeLists.txt +++ b/src/meta/http/test/CMakeLists.txt @@ -2,37 +2,6 @@ # # This source code is licensed under Apache 2.0 License. -nebula_add_test( - NAME meta_http_download_test - SOURCES MetaHttpDownloadHandlerTest.cpp - OBJECTS - $ - $ - ${meta_test_deps} - LIBRARIES - ${ROCKSDB_LIBRARIES} - ${THRIFT_LIBRARIES} - ${PROXYGEN_LIBRARIES} - wangle - gtest - ) - -nebula_add_test( - NAME meta_http_ingest_test - SOURCES MetaHttpIngestHandlerTest.cpp - OBJECTS - $ - $ - ${meta_test_deps} - LIBRARIES - ${ROCKSDB_LIBRARIES} - ${THRIFT_LIBRARIES} - ${PROXYGEN_LIBRARIES} - wangle - gtest - ) - - nebula_add_test( NAME meta_http_replace_test diff --git a/src/meta/http/test/MetaHttpDownloadHandlerTest.cpp b/src/meta/http/test/MetaHttpDownloadHandlerTest.cpp deleted file mode 100644 index 5661cbc10c5..00000000000 --- a/src/meta/http/test/MetaHttpDownloadHandlerTest.cpp +++ /dev/null @@ -1,123 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include - -#include "common/base/Base.h" -#include "common/fs/TempDir.h" -#include "common/http/HttpClient.h" -#include "meta/http/MetaHttpDownloadHandler.h" -#include "meta/test/MockHdfsHelper.h" -#include "meta/test/TestUtils.h" -#include "storage/http/StorageHttpDownloadHandler.h" -#include "webservice/Router.h" -#include "webservice/WebService.h" - -DECLARE_string(pid_file); -DECLARE_int32(ws_storage_http_port); - -namespace nebula { -namespace meta { - -std::unique_ptr helper = std::make_unique(); - -class MetaHttpDownloadHandlerTestEnv : public ::testing::Environment { - public: - void SetUp() override { - FLAGS_ws_ip = "127.0.0.1"; - FLAGS_ws_http_port = 0; - VLOG(1) << "Starting web service..."; - - rootPath_ = std::make_unique("/tmp/MetaHttpDownloadHandler.XXXXXX"); - kv_ = MockCluster::initMetaKV(rootPath_->path()); - TestUtils::createSomeHosts(kv_.get()); - TestUtils::assembleSpace(kv_.get(), 1, 2); - - // Because we reuse the kvstore for storage handler, let's add part - // manually. - auto* partMan = static_cast(kv_->partManager()); - partMan->addPart(1, 1); - partMan->addPart(1, 2); - - // wait for the leader election - sleep(3); - - pool_ = std::make_unique(); - pool_->start(3); - - webSvc_ = std::make_unique(); - auto& router = webSvc_->router(); - - router.get("/download-dispatch").handler([this](nebula::web::PathParams&&) { - auto handler = new meta::MetaHttpDownloadHandler(); - handler->init(kv_.get(), helper.get(), pool_.get()); - return handler; - }); - router.get("/download").handler([this](nebula::web::PathParams&&) { - auto handler = new storage::StorageHttpDownloadHandler(); - std::vector paths{rootPath_->path()}; - handler->init(helper.get(), pool_.get(), kv_.get(), paths); - return handler; - }); - auto status = webSvc_->start(); - FLAGS_ws_storage_http_port = FLAGS_ws_http_port; - ASSERT_TRUE(status.ok()) << status; - } - - void TearDown() override { - kv_.reset(); - rootPath_.reset(); - webSvc_.reset(); - pool_->stop(); - VLOG(1) << "Web service stopped"; - } - - private: - std::unique_ptr webSvc_; - std::unique_ptr rootPath_; - std::unique_ptr kv_; - std::unique_ptr pool_; -}; - -TEST(MetaHttpDownloadHandlerTest, MetaDownloadTest) { - { - auto url = "/download-dispatch"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_TRUE(resp.value().empty()); - } - { - auto url = "/download-dispatch?host=127.0.0.1&port=9000&path=/data&space=1"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile dispatch successfully", resp.value()); - } - { - helper = std::make_unique(); - auto url = "/download-dispatch?host=127.0.0.1&port=9000&path=/data&space=1"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile dispatch failed", resp.value()); - } -} - -} // namespace meta -} // namespace nebula - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - - ::testing::AddGlobalTestEnvironment(new nebula::meta::MetaHttpDownloadHandlerTestEnv()); - - return RUN_ALL_TESTS(); -} diff --git a/src/meta/processors/job/CompactJobExecutor.h b/src/meta/processors/job/CompactJobExecutor.h index b9bd56d36a2..11b32382984 100644 --- a/src/meta/processors/job/CompactJobExecutor.h +++ b/src/meta/processors/job/CompactJobExecutor.h @@ -12,7 +12,7 @@ namespace nebula { namespace meta { /** - * @brief Executor for compact job, always called by job manager + * @brief Executor for compact job, always called by job manager. */ class CompactJobExecutor : public SimpleConcurrentJobExecutor { public: diff --git a/src/meta/processors/job/DataBalanceJobExecutor.h b/src/meta/processors/job/DataBalanceJobExecutor.h index 0ba45a01728..c2dbad2ed6e 100644 --- a/src/meta/processors/job/DataBalanceJobExecutor.h +++ b/src/meta/processors/job/DataBalanceJobExecutor.h @@ -12,7 +12,7 @@ namespace nebula { namespace meta { /** - * @brief Executor for balance in zone, always called by job manager + * @brief Executor for balance in zone, always called by job manager. */ class DataBalanceJobExecutor : public BalanceJobExecutor { FRIEND_TEST(BalanceTest, BalanceDataPlanTest); diff --git a/src/meta/processors/job/DownloadJobExecutor.cpp b/src/meta/processors/job/DownloadJobExecutor.cpp new file mode 100644 index 00000000000..91720681061 --- /dev/null +++ b/src/meta/processors/job/DownloadJobExecutor.cpp @@ -0,0 +1,120 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "meta/processors/job/DownloadJobExecutor.h" + +#include "common/hdfs/HdfsHelper.h" +#include "common/utils/MetaKeyUtils.h" +#include "meta/MetaServiceUtils.h" + +namespace nebula { +namespace meta { + +DownloadJobExecutor::DownloadJobExecutor(GraphSpaceID space, + JobID jobId, + kvstore::KVStore* kvstore, + AdminClient* adminClient, + const std::vector& paras) + : SimpleConcurrentJobExecutor(space, jobId, kvstore, adminClient, paras) { + helper_ = std::make_unique(); +} + +bool DownloadJobExecutor::check() { + if (paras_.size() != 1) { + return false; + } + + auto& url = paras_[0]; + std::string hdfsPrefix = "hdfs://"; + if (url.find(hdfsPrefix) != 0) { + LOG(ERROR) << "URL should start with " << hdfsPrefix; + return false; + } + + auto u = url.substr(hdfsPrefix.size(), url.size()); + std::vector tokens; + folly::split(":", u, tokens); + if (tokens.size() == 2) { + host_ = std::make_unique(tokens[0]); + int32_t position = tokens[1].find_first_of("/"); + if (position != -1) { + try { + port_ = folly::to(tokens[1].toString().substr(0, position).c_str()); + } catch (const std::exception& ex) { + LOG(ERROR) << "URL's port parse failed: " << url; + return false; + } + path_ = + std::make_unique(tokens[1].toString().substr(position, tokens[1].size())); + } else { + LOG(ERROR) << "URL Parse Failed: " << url; + return false; + } + } else { + LOG(ERROR) << "URL Parse Failed: " << url; + return false; + } + + return true; +} + +nebula::cpp2::ErrorCode DownloadJobExecutor::prepare() { + auto errOrHost = getTargetHost(space_); + if (!nebula::ok(errOrHost)) { + LOG(ERROR) << "Can't get any host according to space"; + return nebula::error(errOrHost); + } + + LOG(INFO) << "HDFS host: " << *host_.get() << " port: " << port_ << " path: " << *path_.get(); + + auto listResult = helper_->ls(*host_.get(), port_, *path_.get()); + if (!listResult.ok()) { + LOG(ERROR) << "Dispatch SSTFile Failed"; + return nebula::cpp2::ErrorCode::E_INVALID_JOB; + } + + taskParameters_.emplace_back(*host_.get()); + taskParameters_.emplace_back(folly::to(port_)); + taskParameters_.emplace_back(*path_.get()); + std::unique_ptr iter; + auto prefix = MetaKeyUtils::partPrefix(space_); + auto result = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Fetch Parts Failed"; + } + return result; +} + +folly::Future DownloadJobExecutor::executeInternal(HostAddr&& address, + std::vector&& parts) { + taskParameters_.resize(3); + folly::Promise pro; + auto f = pro.getFuture(); + adminClient_ + ->addTask(cpp2::JobType::DOWNLOAD, + jobId_, + taskId_++, + space_, + std::move(address), + taskParameters_, + std::move(parts)) + .then([pro = std::move(pro)](auto&& t) mutable { + CHECK(!t.hasException()); + auto status = std::move(t).value(); + if (status.ok()) { + pro.setValue(Status::OK()); + } else { + pro.setValue(status.status()); + } + }); + return f; +} + +nebula::cpp2::ErrorCode DownloadJobExecutor::stop() { + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/job/DownloadJobExecutor.h b/src/meta/processors/job/DownloadJobExecutor.h new file mode 100644 index 00000000000..4c4c0c12cc5 --- /dev/null +++ b/src/meta/processors/job/DownloadJobExecutor.h @@ -0,0 +1,50 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef META_DOWNLOADJOBEXECUTOR_H_ +#define META_DOWNLOADJOBEXECUTOR_H_ + +#include "common/hdfs/HdfsCommandHelper.h" +#include "meta/processors/job/SimpleConcurrentJobExecutor.h" + +namespace nebula { +namespace meta { + +/** + * @brief Executor for download job, always called by job manager. + */ +class DownloadJobExecutor : public SimpleConcurrentJobExecutor { + FRIEND_TEST(JobManagerTest, DownloadJob); + FRIEND_TEST(JobManagerTest, IngestJob); + + public: + DownloadJobExecutor(GraphSpaceID space, + JobID jobId, + kvstore::KVStore* kvstore, + AdminClient* adminClient, + const std::vector& params); + + bool check() override; + + nebula::cpp2::ErrorCode prepare() override; + + nebula::cpp2::ErrorCode stop() override; + + protected: + folly::Future executeInternal(HostAddr&& address, + std::vector&& parts) override; + + private: + std::unique_ptr host_; + int32_t port_; + std::unique_ptr path_; + std::unique_ptr helper_; + std::vector taskParameters_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_DOWNLOADJOBEXECUTOR_H_ diff --git a/src/meta/processors/job/FlushJobExecutor.h b/src/meta/processors/job/FlushJobExecutor.h index f0b2dd0d953..d4d007b8eae 100644 --- a/src/meta/processors/job/FlushJobExecutor.h +++ b/src/meta/processors/job/FlushJobExecutor.h @@ -12,7 +12,7 @@ namespace nebula { namespace meta { /** - * @brief Executor for flush job, always called by job manager + * @brief Executor for flush job, always called by job manager. */ class FlushJobExecutor : public SimpleConcurrentJobExecutor { public: diff --git a/src/meta/processors/job/GetStatsProcessor.h b/src/meta/processors/job/GetStatsProcessor.h index 3d7d6214dbe..d62e8898d3e 100644 --- a/src/meta/processors/job/GetStatsProcessor.h +++ b/src/meta/processors/job/GetStatsProcessor.h @@ -12,7 +12,7 @@ namespace nebula { namespace meta { /** - * @brief Get stats of a job, return error if the job is running or failed + * @brief Get stats of a job, return error if the job is running or failed. */ class GetStatsProcessor : public BaseProcessor { public: diff --git a/src/meta/processors/job/IngestJobExecutor.cpp b/src/meta/processors/job/IngestJobExecutor.cpp new file mode 100644 index 00000000000..a672d6dfa85 --- /dev/null +++ b/src/meta/processors/job/IngestJobExecutor.cpp @@ -0,0 +1,70 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "meta/processors/job/IngestJobExecutor.h" + +#include "common/utils/MetaKeyUtils.h" +#include "meta/MetaServiceUtils.h" + +namespace nebula { +namespace meta { + +IngestJobExecutor::IngestJobExecutor(JobID jobId, + GraphSpaceID space, + kvstore::KVStore* kvstore, + AdminClient* adminClient, + const std::vector& paras) + : SimpleConcurrentJobExecutor(space, jobId, kvstore, adminClient, paras) {} + +bool IngestJobExecutor::check() { + return paras_.size() == 1; +} + +nebula::cpp2::ErrorCode IngestJobExecutor::prepare() { + std::unique_ptr iter; + auto prefix = MetaKeyUtils::partPrefix(space_); + auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Fetch Parts Failed"; + return code; + } + + while (iter->valid()) { + for (auto& host : MetaKeyUtils::parsePartVal(iter->val())) { + if (storageHosts_.count(host.host) == 0) { + storageHosts_.insert(host.host); + } + } + iter->next(); + } + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +folly::Future IngestJobExecutor::executeInternal(HostAddr&& address, + std::vector&& parts) { + folly::Promise pro; + auto f = pro.getFuture(); + adminClient_ + ->addTask(cpp2::JobType::INGEST, + jobId_, + taskId_++, + space_, + std::move(address), + taskParameters_, + std::move(parts)) + .then([pro = std::move(pro)](auto&& t) mutable { + CHECK(!t.hasException()); + auto status = std::move(t).value(); + if (status.ok()) { + pro.setValue(Status::OK()); + } else { + pro.setValue(status.status()); + } + }); + return f; +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/job/IngestJobExecutor.h b/src/meta/processors/job/IngestJobExecutor.h new file mode 100644 index 00000000000..3b93dc056ca --- /dev/null +++ b/src/meta/processors/job/IngestJobExecutor.h @@ -0,0 +1,41 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef META_INGESTJOBEXECUTOR_H_ +#define META_INGESTJOBEXECUTOR_H_ + +#include "meta/processors/job/MetaJobExecutor.h" +#include "meta/processors/job/SimpleConcurrentJobExecutor.h" + +namespace nebula { +namespace meta { + +/** + * @brief Executor for ingest job, always called by job manager. + */ +class IngestJobExecutor : public SimpleConcurrentJobExecutor { + public: + IngestJobExecutor(GraphSpaceID space, + JobID jobId, + kvstore::KVStore* kvstore, + AdminClient* adminClient, + const std::vector& params); + + bool check() override; + + nebula::cpp2::ErrorCode prepare() override; + + folly::Future executeInternal(HostAddr&& address, + std::vector&& parts) override; + + private: + std::set storageHosts_; + std::vector taskParameters_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_INGESTJOBEXECUTOR_H_ diff --git a/src/meta/processors/job/JobDescription.h b/src/meta/processors/job/JobDescription.h index 3ae3ffa786c..a1328aa8d9a 100644 --- a/src/meta/processors/job/JobDescription.h +++ b/src/meta/processors/job/JobDescription.h @@ -3,8 +3,8 @@ * This source code is licensed under Apache 2.0 License. */ -#ifndef META_KVJOBDESCRIPTION_H_ -#define META_KVJOBDESCRIPTION_H_ +#ifndef META_JOBDESCRIPTION_H_ +#define META_JOBDESCRIPTION_H_ #include @@ -182,4 +182,4 @@ class JobDescription { } // namespace meta } // namespace nebula -#endif // META_KVJOBDESCRIPTION_H_ +#endif // META_JOBDESCRIPTION_H_ diff --git a/src/meta/processors/job/JobExecutor.cpp b/src/meta/processors/job/JobExecutor.cpp index 41c5cfca5c4..1a11a221e36 100644 --- a/src/meta/processors/job/JobExecutor.cpp +++ b/src/meta/processors/job/JobExecutor.cpp @@ -14,7 +14,9 @@ #include "meta/processors/admin/AdminClient.h" #include "meta/processors/job/CompactJobExecutor.h" #include "meta/processors/job/DataBalanceJobExecutor.h" +#include "meta/processors/job/DownloadJobExecutor.h" #include "meta/processors/job/FlushJobExecutor.h" +#include "meta/processors/job/IngestJobExecutor.h" #include "meta/processors/job/LeaderBalanceJobExecutor.h" #include "meta/processors/job/RebuildEdgeJobExecutor.h" #include "meta/processors/job/RebuildFTJobExecutor.h" @@ -48,6 +50,16 @@ std::unique_ptr JobExecutorFactory::createJobExecutor(const JobDesc case cpp2::JobType::COMPACT: ret.reset(new CompactJobExecutor(jd.getSpace(), jd.getJobId(), store, client, jd.getParas())); break; + case cpp2::JobType::FLUSH: + ret.reset(new FlushJobExecutor(jd.getSpace(), jd.getJobId(), store, client, jd.getParas())); + break; + case cpp2::JobType::DOWNLOAD: + ret.reset( + new DownloadJobExecutor(jd.getSpace(), jd.getJobId(), store, client, jd.getParas())); + break; + case cpp2::JobType::INGEST: + ret.reset(new IngestJobExecutor(jd.getSpace(), jd.getJobId(), store, client, jd.getParas())); + break; case cpp2::JobType::DATA_BALANCE: ret.reset(new DataBalanceJobExecutor(jd, store, client, jd.getParas())); break; @@ -58,9 +70,6 @@ std::unique_ptr JobExecutorFactory::createJobExecutor(const JobDesc ret.reset( new LeaderBalanceJobExecutor(jd.getSpace(), jd.getJobId(), store, client, jd.getParas())); break; - case cpp2::JobType::FLUSH: - ret.reset(new FlushJobExecutor(jd.getSpace(), jd.getJobId(), store, client, jd.getParas())); - break; case cpp2::JobType::REBUILD_TAG_INDEX: ret.reset( new RebuildTagJobExecutor(jd.getSpace(), jd.getJobId(), store, client, jd.getParas())); diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index a1d7f928285..d6967693e98 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -121,7 +121,7 @@ void JobManager::scheduleThread() { std::tuple opJobId; while (!tryDequeue(opJobId)) { if (status_.load(std::memory_order_acquire) == JbmgrStatus::STOPPED) { - LOG(INFO) << "[JobManager] detect shutdown called, exit"; + LOG(INFO) << "Detect shutdown called, exit"; break; } usleep(FLAGS_job_check_intervals); @@ -133,12 +133,12 @@ void JobManager::scheduleThread() { std::lock_guard lk(muJobFinished_[spaceId]); auto jobDescRet = JobDescription::loadJobDescription(spaceId, jodId, kvStore_); if (!nebula::ok(jobDescRet)) { - LOG(INFO) << "[JobManager] load an invalid job from space " << spaceId << " jodId " << jodId; + LOG(INFO) << "Load an invalid job from space " << spaceId << " jodId " << jodId; continue; // leader change or archive happened } auto jobDesc = nebula::value(jobDescRet); if (!jobDesc.setStatus(cpp2::JobStatus::RUNNING, jobOp == JbOp::RECOVER)) { - LOG(INFO) << "[JobManager] skip job space " << spaceId << " jodId " << jodId; + LOG(INFO) << "Skip job space " << spaceId << " jodId " << jodId; continue; } @@ -158,8 +158,7 @@ void JobManager::scheduleThread() { } bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { - std::unique_ptr je = - JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_); + auto je = JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_); JobExecutor* jobExec = je.get(); runningJobs_.emplace(jobDesc.getJobId(), std::move(je)); @@ -370,7 +369,7 @@ nebula::cpp2::ErrorCode JobManager::reportTaskFinish(const cpp2::ReportTaskReq& }); if (task == tasks.end()) { LOG(INFO) << folly::sformat( - "report an invalid or outdate task, will ignore this report, job={}, " + "Report an invalid or outdate task, will ignore this report, job={}, " "task={}", jobId, taskId); @@ -517,7 +516,7 @@ ErrorOr> JobManager::showJob // skip expired job, default 1 week if (isExpiredJob(optJob)) { lastExpiredJobId = optJob.getJobId(); - LOG(INFO) << "remove expired job " << lastExpiredJobId; + LOG(INFO) << "Remove expired job " << lastExpiredJobId; expiredJobKeys.emplace_back(jobKey); continue; } @@ -701,7 +700,7 @@ ErrorOr JobManager::recoverJob( nebula::cpp2::ErrorCode JobManager::save(const std::string& k, const std::string& v) { std::vector data{std::make_pair(k, v)}; folly::Baton baton; - auto rc = nebula::cpp2::ErrorCode::SUCCEEDED; + nebula::cpp2::ErrorCode rc = nebula::cpp2::ErrorCode::SUCCEEDED; kvStore_->asyncMultiPut( kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { rc = code; diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 50cf1243e73..22371b4df8b 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -41,6 +41,8 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { FRIEND_TEST(JobManagerTest, RecoverJob); FRIEND_TEST(JobManagerTest, AddRebuildTagIndexJob); FRIEND_TEST(JobManagerTest, AddRebuildEdgeIndexJob); + FRIEND_TEST(JobManagerTest, DownloadJob); + FRIEND_TEST(JobManagerTest, IngestJob); FRIEND_TEST(GetStatsTest, StatsJob); FRIEND_TEST(GetStatsTest, MockSingleMachineTest); FRIEND_TEST(GetStatsTest, MockMultiMachineTest); diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index b9cc94d1bf3..3d20767e90b 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -4,15 +4,19 @@ */ #include +#include #include #include "common/base/Base.h" #include "common/fs/TempDir.h" #include "kvstore/Common.h" #include "meta/ActiveHostsMan.h" +#include "meta/processors/job/DownloadJobExecutor.h" +#include "meta/processors/job/IngestJobExecutor.h" #include "meta/processors/job/JobManager.h" #include "meta/processors/job/TaskDescription.h" #include "meta/test/MockAdminClient.h" +#include "meta/test/MockHdfsHelper.h" #include "meta/test/TestUtils.h" #include "webservice/WebService.h" @@ -21,8 +25,15 @@ DECLARE_int32(ws_storage_http_port); namespace nebula { namespace meta { +using ::testing::_; +using ::testing::AtLeast; +using ::testing::ByMove; using ::testing::DefaultValue; +using ::testing::NaggyMock; using ::testing::NiceMock; +using ::testing::Return; +using ::testing::SetArgPointee; +using ::testing::StrictMock; class JobManagerTest : public ::testing::Test { protected: @@ -113,6 +124,54 @@ TEST_F(JobManagerTest, AddRebuildEdgeIndexJob) { ASSERT_TRUE(result); } +TEST_F(JobManagerTest, DownloadJob) { + auto rootPath = std::make_unique("/tmp/JobManagerTest.XXXXXX"); + mock::MockCluster cluster; + std::unique_ptr kv = cluster.initMetaKV(rootPath->path()); + ASSERT_TRUE(TestUtils::createSomeHosts(kv.get())); + TestUtils::assembleSpace(kv.get(), 1, 1); + std::vector paras{"hdfs://127.0.0.1:9000/test_space"}; + GraphSpaceID space = 1; + JobID jobId = 11; + JobDescription job(space, jobId, cpp2::JobType::DOWNLOAD, paras); + + MockAdminClient adminClient; + EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return(ByMove(folly::makeFuture(Status::OK())))); + + auto executor = + new DownloadJobExecutor(space, job.getJobId(), kv.get(), &adminClient, job.getParas()); + executor->helper_ = std::make_unique(); + + ASSERT_TRUE(executor->check()); + auto code = executor->prepare(); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); + code = executor->execute(); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); +} + +TEST_F(JobManagerTest, IngestJob) { + auto rootPath = std::make_unique("/tmp/DownloadAndIngestTest.XXXXXX"); + mock::MockCluster cluster; + std::unique_ptr kv = cluster.initMetaKV(rootPath->path()); + ASSERT_TRUE(TestUtils::createSomeHosts(kv.get())); + TestUtils::assembleSpace(kv.get(), 1, 1); + std::vector paras{"test_space"}; + GraphSpaceID space = 1; + JobID jobId = 11; + JobDescription job(space, jobId, cpp2::JobType::INGEST, paras); + + MockAdminClient adminClient; + auto executor = + new IngestJobExecutor(space, job.getJobId(), kv.get(), &adminClient, job.getParas()); + + ASSERT_TRUE(executor->check()); + auto code = executor->prepare(); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); + code = executor->execute(); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); +} + TEST_F(JobManagerTest, StatsJob) { std::unique_ptr> jobMgr = getJobManager(); // For preventing job schedule in JobManager @@ -368,7 +427,7 @@ TEST_F(JobManagerTest, ShowJob) { jd.setStatus(cpp2::JobStatus::FINISHED); jobMgr->addJob(jd, adminClient_.get()); - int32_t jobId2 = jd.getJobId(); + JobID jobId2 = jd.getJobId(); int32_t task1 = 0; auto host1 = toHost("127.0.0.1"); @@ -434,7 +493,7 @@ TEST_F(JobManagerTest, ShowJobInOtherSpace) { jd.setStatus(cpp2::JobStatus::FINISHED); jobMgr->addJob(jd, adminClient_.get()); - int32_t jobId2 = jd.getJobId(); + JobID jobId2 = jd.getJobId(); int32_t task1 = 0; auto host1 = toHost("127.0.0.1"); @@ -470,7 +529,6 @@ TEST_F(JobManagerTest, RecoverJob) { jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; jobMgr->bgThread_.join(); GraphSpaceID spaceId = 1; - int32_t nJob = 3; for (auto jobId = 0; jobId < nJob; ++jobId) { JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH); @@ -518,7 +576,7 @@ TEST(JobDescriptionTest, Ctor2) { TEST(JobDescriptionTest, ParseKey) { GraphSpaceID spaceId = 1; - int32_t jobId = std::pow(2, 16); + JobID jobId = std::pow(2, 16); JobDescription jd(spaceId, jobId, cpp2::JobType::COMPACT); ASSERT_EQ(jobId, jd.getJobId()); ASSERT_EQ(cpp2::JobType::COMPACT, jd.getJobType()); @@ -531,7 +589,7 @@ TEST(JobDescriptionTest, ParseKey) { TEST(JobDescriptionTest, ParseVal) { GraphSpaceID spaceId = 1; - int32_t jobId = std::pow(2, 15); + JobID jobId = std::pow(2, 15); JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH); auto status = cpp2::JobStatus::FINISHED; jd.setStatus(cpp2::JobStatus::RUNNING); @@ -558,8 +616,8 @@ TEST(JobDescriptionTest, ParseVal) { TEST(TaskDescriptionTest, Ctor) { GraphSpaceID spaceId = 1; - int32_t jobId = std::pow(2, 4); - int32_t taskId = 2; + JobID jobId = std::pow(2, 4); + TaskID taskId = 2; auto dest = toHost(""); TaskDescription td(spaceId, jobId, taskId, dest); auto status = cpp2::JobStatus::RUNNING; @@ -574,8 +632,8 @@ TEST(TaskDescriptionTest, Ctor) { TEST(TaskDescriptionTest, ParseKey) { GraphSpaceID spaceId = 1; - int32_t jobId = std::pow(2, 5); - int32_t taskId = 2; + JobID jobId = std::pow(2, 5); + TaskID taskId = 2; std::string dest{"127.0.0.1"}; TaskDescription td(spaceId, jobId, taskId, toHost(dest)); @@ -589,8 +647,8 @@ TEST(TaskDescriptionTest, ParseKey) { TEST(TaskDescriptionTest, ParseVal) { GraphSpaceID spaceId = 1; - int32_t jobId = std::pow(2, 5); - int32_t taskId = 3; + JobID jobId = std::pow(2, 5); + TaskID taskId = 3; std::string dest{"127.0.0.1"}; TaskDescription td(spaceId, jobId, taskId, toHost(dest)); diff --git a/src/meta/test/MockHdfsHelper.h b/src/meta/test/MockHdfsHelper.h index 9d911466efc..295bfbad92d 100644 --- a/src/meta/test/MockHdfsHelper.h +++ b/src/meta/test/MockHdfsHelper.h @@ -13,24 +13,15 @@ namespace meta { class MockHdfsOKHelper : public nebula::hdfs::HdfsHelper { public: - StatusOr ls(const std::string& hdfsHost, - int32_t hdfsPort, - const std::string& hdfsPath) override { - UNUSED(hdfsHost); - UNUSED(hdfsPort); - UNUSED(hdfsPath); + StatusOr ls(const std::string&, int32_t, const std::string&) override { sleep(1); return "total 2\n0000.sst\n000.sst"; } - StatusOr copyToLocal(const std::string& hdfsHost, - int32_t hdfsPort, - const std::string& hdfsPath, - const std::string& localPath) override { - UNUSED(hdfsHost); - UNUSED(hdfsPort); - UNUSED(hdfsPath); - UNUSED(localPath); + StatusOr copyToLocal(const std::string&, + int32_t, + const std::string&, + const std::string&) override { sleep(1); return ""; } @@ -42,22 +33,15 @@ class MockHdfsOKHelper : public nebula::hdfs::HdfsHelper { class MockHdfsNotExistHelper : public nebula::hdfs::HdfsHelper { public: - StatusOr ls(const std::string& hdfsHost, - int32_t hdfsPort, - const std::string& hdfsPath) override { - UNUSED(hdfsHost); - UNUSED(hdfsPort); + StatusOr ls(const std::string&, int32_t, const std::string& hdfsPath) override { sleep(1); return Status::Error(folly::stringPrintf("HDFS Path %s Not Exist", hdfsPath.c_str())); } - StatusOr copyToLocal(const std::string& hdfsHost, - int32_t hdfsPort, + StatusOr copyToLocal(const std::string&, + int32_t, const std::string& hdfsPath, - const std::string& localPath) override { - UNUSED(hdfsHost); - UNUSED(hdfsPort); - UNUSED(localPath); + const std::string&) override { sleep(1); return Status::Error(folly::stringPrintf("HDFS Path %s Not Exist", hdfsPath.c_str())); } diff --git a/src/parser/AdminSentences.cpp b/src/parser/AdminSentences.cpp index c9a1e2ff13c..b72735bb18a 100644 --- a/src/parser/AdminSentences.cpp +++ b/src/parser/AdminSentences.cpp @@ -258,10 +258,9 @@ std::string AdminJobSentence::toString() const { case meta::cpp2::JobType::STATS: return "SUBMIT JOB STATS"; case meta::cpp2::JobType::DOWNLOAD: - return paras_.empty() ? "DOWNLOAD HDFS " - : folly::stringPrintf("DOWNLOAD HDFS %s", paras_[0].c_str()); + return folly::stringPrintf("SUBMIT JOB DOWNLOAD HDFS \"%s\"", paras_[0].c_str()); case meta::cpp2::JobType::INGEST: - return "INGEST"; + return "SUBMIT JOB INGEST"; case meta::cpp2::JobType::DATA_BALANCE: if (paras_.empty()) { return "SUBMIT JOB BALANCE IN ZONE"; diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index 2ece4c54405..23b7a8e0230 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -259,15 +259,6 @@ class SpaceOptItem final { } } - std::string getGroupName() const { - if (isString()) { - return asString(); - } else { - LOG(ERROR) << "group name value illegal."; - return ""; - } - } - OptionType getOptType() const { return optType_; } diff --git a/src/parser/MutateSentences.cpp b/src/parser/MutateSentences.cpp index 077d58d1b11..09d4596b339 100644 --- a/src/parser/MutateSentences.cpp +++ b/src/parser/MutateSentences.cpp @@ -283,13 +283,4 @@ std::string DeleteEdgesSentence::toString() const { return buf; } -std::string DownloadSentence::toString() const { - return folly::stringPrintf( - "DOWNLOAD HDFS \"hdfs://%s:%d%s\"", host_.get()->c_str(), port_, path_.get()->c_str()); -} - -std::string IngestSentence::toString() const { - return "INGEST"; -} - } // namespace nebula diff --git a/src/parser/MutateSentences.h b/src/parser/MutateSentences.h index 2fcc12ed31b..42347cb0b78 100644 --- a/src/parser/MutateSentences.h +++ b/src/parser/MutateSentences.h @@ -610,74 +610,5 @@ class DeleteEdgesSentence final : public Sentence { std::unique_ptr edgeKeyRef_; }; -class DownloadSentence final : public Sentence { - public: - DownloadSentence() { - kind_ = Kind::kDownload; - } - - const std::string *host() const { - return host_.get(); - } - - int32_t port() const { - return port_; - } - - void setPort(int32_t port) { - port_ = port; - } - - const std::string *path() const { - return path_.get(); - } - - void setUrl(std::string &url) { - static std::string hdfsPrefix = "hdfs://"; - if (url.find(hdfsPrefix) != 0) { - LOG(ERROR) << "URL should start with " << hdfsPrefix; - return; - } - - std::string u = url.substr(hdfsPrefix.size(), url.size()); - std::vector tokens; - folly::split(":", u, tokens); - if (tokens.size() == 2) { - host_ = std::make_unique(tokens[0]); - int32_t position = tokens[1].find_first_of("/"); - if (position != -1) { - try { - port_ = folly::to(tokens[1].toString().substr(0, position).c_str()); - } catch (const std::exception &ex) { - LOG(ERROR) << "URL's port parse failed: " << url; - return; - } - path_ = - std::make_unique(tokens[1].toString().substr(position, tokens[1].size())); - } else { - LOG(ERROR) << "URL Parse Failed: " << url; - } - } else { - LOG(ERROR) << "URL Parse Failed: " << url; - } - } - - std::string toString() const override; - - private: - std::unique_ptr host_; - int32_t port_; - std::unique_ptr path_; -}; - -class IngestSentence final : public Sentence { - public: - IngestSentence() { - kind_ = Kind::kIngest; - } - - std::string toString() const override; -}; - } // namespace nebula #endif // PARSER_MUTATESENTENCES_H_ diff --git a/src/parser/Sentence.h b/src/parser/Sentence.h index 9a10e5e2daa..18b998a4d65 100644 --- a/src/parser/Sentence.h +++ b/src/parser/Sentence.h @@ -97,8 +97,6 @@ class Sentence { kGrant, kRevoke, kChangePassword, - kDownload, - kIngest, kOrderBy, kShowConfigs, kSetConfig, diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 5abb2a1e95f..9147a668c30 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -3237,10 +3237,18 @@ delete_tag_sentence download_sentence : KW_DOWNLOAD KW_HDFS STRING { - auto sentence = new DownloadSentence(); - sentence->setUrl(*$3); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::DOWNLOAD); + sentence->addPara(*$3); + $$ = sentence; + } + ; + +ingest_sentence + : KW_INGEST { + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::INGEST); $$ = sentence; - delete $3; } ; @@ -3256,13 +3264,6 @@ delete_edge_sentence } ; -ingest_sentence - : KW_INGEST { - auto sentence = new IngestSentence(); - $$ = sentence; - } - ; - admin_job_sentence : KW_SUBMIT KW_JOB KW_COMPACT { auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, @@ -3274,6 +3275,17 @@ admin_job_sentence meta::cpp2::JobType::FLUSH); $$ = sentence; } + | KW_SUBMIT KW_JOB KW_DOWNLOAD KW_HDFS STRING { + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::DOWNLOAD); + sentence->addPara(*$5); + $$ = sentence; + } + | KW_SUBMIT KW_JOB KW_INGEST { + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::INGEST); + $$ = sentence; + } | KW_SUBMIT KW_JOB KW_STATS { auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, meta::cpp2::JobType::STATS); diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index 71112a058fb..4517c67761b 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -3220,6 +3220,12 @@ TEST_F(ParserTest, JobTest) { }; checkTest("SUBMIT JOB COMPACT", "SUBMIT JOB COMPACT"); checkTest("SUBMIT JOB FLUSH", "SUBMIT JOB FLUSH"); + + checkTest("SUBMIT JOB DOWNLOAD HDFS \"hdfs://127.0.0.1:9090/data\"", + "SUBMIT JOB DOWNLOAD HDFS \"hdfs://127.0.0.1:9090/data\""); + + checkTest("SUBMIT JOB INGEST", "SUBMIT JOB INGEST"); + checkTest("SUBMIT JOB STATS", "SUBMIT JOB STATS"); checkTest("SUBMIT JOB BALANCE IN ZONE", "SUBMIT JOB BALANCE IN ZONE"); checkTest( diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 55874b5cc30..86d4ab2758e 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -16,6 +16,8 @@ nebula_add_library( admin/AdminTask.cpp admin/CompactTask.cpp admin/FlushTask.cpp + admin/DownloadTask.cpp + admin/IngestTask.cpp admin/RebuildIndexTask.cpp admin/RebuildTagIndexTask.cpp admin/RebuildEdgeIndexTask.cpp @@ -64,8 +66,6 @@ nebula_add_library( nebula_add_library( storage_http_handler OBJECT - http/StorageHttpIngestHandler.cpp - http/StorageHttpDownloadHandler.cpp http/StorageHttpAdminHandler.cpp http/StorageHttpStatsHandler.cpp http/StorageHttpPropertyHandler.cpp diff --git a/src/storage/StorageServer.cpp b/src/storage/StorageServer.cpp index 90b4305877f..f2ed6487d49 100644 --- a/src/storage/StorageServer.cpp +++ b/src/storage/StorageServer.cpp @@ -27,8 +27,6 @@ #include "storage/StorageAdminServiceHandler.h" #include "storage/StorageFlags.h" #include "storage/http/StorageHttpAdminHandler.h" -#include "storage/http/StorageHttpDownloadHandler.h" -#include "storage/http/StorageHttpIngestHandler.h" #include "storage/http/StorageHttpPropertyHandler.h" #include "storage/http/StorageHttpStatsHandler.h" #include "storage/transaction/TransactionManager.h" @@ -104,16 +102,6 @@ bool StorageServer::initWebService() { webSvc_ = std::make_unique(); auto& router = webSvc_->router(); - router.get("/download").handler([this](web::PathParams&&) { - auto* handler = new storage::StorageHttpDownloadHandler(); - handler->init(hdfsHelper_.get(), webWorkers_.get(), kvstore_.get(), dataPaths_); - return handler; - }); - router.get("/ingest").handler([this](web::PathParams&&) { - auto handler = new nebula::storage::StorageHttpIngestHandler(); - handler->init(kvstore_.get()); - return handler; - }); router.get("/admin").handler([this](web::PathParams&&) { return new storage::StorageHttpAdminHandler(schemaMan_.get(), kvstore_.get()); }); diff --git a/src/storage/admin/AdminTask.cpp b/src/storage/admin/AdminTask.cpp index 90608c18f26..6f44d9595ac 100644 --- a/src/storage/admin/AdminTask.cpp +++ b/src/storage/admin/AdminTask.cpp @@ -6,7 +6,9 @@ #include "storage/admin/AdminTask.h" #include "storage/admin/CompactTask.h" +#include "storage/admin/DownloadTask.h" #include "storage/admin/FlushTask.h" +#include "storage/admin/IngestTask.h" #include "storage/admin/RebuildEdgeIndexTask.h" #include "storage/admin/RebuildFTIndexTask.h" #include "storage/admin/RebuildTagIndexTask.h" @@ -37,6 +39,12 @@ std::shared_ptr AdminTaskFactory::createAdminTask(StorageEnv* env, Ta case meta::cpp2::JobType::STATS: ret = std::make_shared(env, std::move(ctx)); break; + case meta::cpp2::JobType::DOWNLOAD: + ret = std::make_shared(env, std::move(ctx)); + break; + case meta::cpp2::JobType::INGEST: + ret = std::make_shared(env, std::move(ctx)); + break; default: break; } diff --git a/src/storage/admin/AdminTask.h b/src/storage/admin/AdminTask.h index 8e2ebbcb46f..a2d47fc357e 100644 --- a/src/storage/admin/AdminTask.h +++ b/src/storage/admin/AdminTask.h @@ -9,9 +9,6 @@ #include #include -#include "common/thrift/ThriftTypes.h" -#include "interface/gen-cpp2/meta_types.h" -#include "interface/gen-cpp2/storage_types.h" #include "kvstore/Common.h" #include "kvstore/NebulaStore.h" #include "storage/CommonUtils.h" @@ -19,6 +16,8 @@ namespace nebula { namespace storage { +using TaskFunction = std::function; + /** * @brief Subtask class for admin tasks. An admin task comprises a sequence of subtasks. * @@ -27,7 +26,7 @@ class AdminSubTask { public: AdminSubTask() = default; - explicit AdminSubTask(std::function f) : run_(f) {} + explicit AdminSubTask(TaskFunction f) : run_(f) {} /** * @brief Entry point to invoke sub tasks function. @@ -39,7 +38,7 @@ class AdminSubTask { } private: - std::function run_; + TaskFunction run_; }; enum class TaskPriority : int8_t { LO, MID, HI }; @@ -89,6 +88,12 @@ class AdminTask { virtual ~AdminTask() {} + /** + * @brief Check the argument + * + */ + virtual bool check() = 0; + /** * @brief Set the Callback object * @@ -172,7 +177,7 @@ class AdminTask { * @param rc Errorcode. */ virtual void subTaskFinish(nebula::cpp2::ErrorCode rc) { - auto suc = nebula::cpp2::ErrorCode::SUCCEEDED; + nebula::cpp2::ErrorCode suc = nebula::cpp2::ErrorCode::SUCCEEDED; rc_.compare_exchange_strong(suc, rc); } @@ -183,7 +188,7 @@ class AdminTask { virtual void cancel() { FLOG_INFO("task(%d, %d) cancelled", ctx_.jobId_, ctx_.taskId_); canceled_ = true; - auto suc = nebula::cpp2::ErrorCode::SUCCEEDED; + nebula::cpp2::ErrorCode suc = nebula::cpp2::ErrorCode::SUCCEEDED; rc_.compare_exchange_strong(suc, nebula::cpp2::ErrorCode::E_USER_CANCEL); } diff --git a/src/storage/admin/AdminTaskProcessor.h b/src/storage/admin/AdminTaskProcessor.h index de3f3cdb7d5..76c0beda78e 100644 --- a/src/storage/admin/AdminTaskProcessor.h +++ b/src/storage/admin/AdminTaskProcessor.h @@ -7,7 +7,6 @@ #define STORAGE_ADMIN_ADMINTASKPROCESSOR_H_ #include "common/base/Base.h" -#include "common/thrift/ThriftTypes.h" #include "interface/gen-cpp2/meta_types.h" #include "kvstore/NebulaStore.h" #include "storage/BaseProcessor.h" diff --git a/src/storage/admin/CompactTask.cpp b/src/storage/admin/CompactTask.cpp index 913a6bc6924..a746d71c427 100644 --- a/src/storage/admin/CompactTask.cpp +++ b/src/storage/admin/CompactTask.cpp @@ -10,6 +10,10 @@ namespace nebula { namespace storage { +bool CompactTask::check() { + return env_->kvstore_ != nullptr; +} + ErrorOr> CompactTask::genSubTasks() { std::vector ret; if (!env_->kvstore_) { diff --git a/src/storage/admin/CompactTask.h b/src/storage/admin/CompactTask.h index e01bc2d0a8d..e9479ca2dc4 100644 --- a/src/storage/admin/CompactTask.h +++ b/src/storage/admin/CompactTask.h @@ -6,7 +6,6 @@ #ifndef STORAGE_ADMIN_COMPACTTASK_H_ #define STORAGE_ADMIN_COMPACTTASK_H_ -#include "common/thrift/ThriftTypes.h" #include "kvstore/KVEngine.h" #include "kvstore/NebulaStore.h" #include "storage/admin/AdminTask.h" @@ -22,6 +21,8 @@ class CompactTask : public AdminTask { public: CompactTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) {} + bool check() override; + /** * @brief Generate subtasks for compact. * diff --git a/src/storage/admin/DownloadTask.cpp b/src/storage/admin/DownloadTask.cpp new file mode 100644 index 00000000000..cca84d1a759 --- /dev/null +++ b/src/storage/admin/DownloadTask.cpp @@ -0,0 +1,59 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "storage/admin/DownloadTask.h" + +#include "common/fs/FileUtils.h" + +namespace nebula { +namespace storage { + +bool DownloadTask::check() { + return env_->kvstore_ != nullptr; +} + +ErrorOr> DownloadTask::genSubTasks() { + auto space = *ctx_.parameters_.space_id_ref(); + auto parts = *ctx_.parameters_.parts_ref(); + auto paras = ctx_.parameters_.task_specific_paras_ref(); + if (!paras.has_value() || paras->size() != 3) { + LOG(ERROR) << "Download Task should be three parameters"; + return nebula::cpp2::ErrorCode::E_INVALID_PARM; + } + + hdfsHost_ = (*paras)[0]; + hdfsPort_ = folly::to((*paras)[1]); + hdfsPath_ = (*paras)[2]; + std::vector tasks; + for (const auto& part : parts) { + TaskFunction task = std::bind(&DownloadTask::subTask, this, space, part); + tasks.emplace_back(std::move(task)); + } + return tasks; +} + +nebula::cpp2::ErrorCode DownloadTask::subTask(GraphSpaceID space, PartitionID part) { + LOG(INFO) << "Space: " << space << " Part: " << part; + auto hdfsPartPath = folly::stringPrintf("%s/%d", hdfsPath_.c_str(), part); + auto partResult = env_->kvstore_->part(space, part); + if (!ok(partResult)) { + LOG(ERROR) << "Can't found space: " << space << ", part: " << part; + return nebula::cpp2::ErrorCode::E_PART_NOT_FOUND; + } + + auto localPath = folly::stringPrintf("%s/download/", value(partResult)->engine()->getDataRoot()); + if (fs::FileUtils::fileType(localPath.c_str()) == fs::FileType::NOTEXIST) { + if (!fs::FileUtils::makeDir(localPath)) { + return nebula::cpp2::ErrorCode::E_TASK_EXECUTION_FAILED; + } + } + + auto result = helper_->copyToLocal(hdfsHost_, hdfsPort_, hdfsPartPath, localPath); + return result.ok() ? nebula::cpp2::ErrorCode::SUCCEEDED + : nebula::cpp2::ErrorCode::E_TASK_EXECUTION_FAILED; +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/admin/DownloadTask.h b/src/storage/admin/DownloadTask.h new file mode 100644 index 00000000000..2b3f8785931 --- /dev/null +++ b/src/storage/admin/DownloadTask.h @@ -0,0 +1,42 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef STORAGE_ADMIN_DOWNLOADTASK_H_ +#define STORAGE_ADMIN_DOWNLOADTASK_H_ + +#include "common/hdfs/HdfsCommandHelper.h" +#include "storage/admin/AdminTask.h" + +namespace nebula { +namespace storage { + +/** + * @brief Task class to handle storage download task. + * + */ +class DownloadTask : public AdminTask { + public: + DownloadTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) { + helper_ = std::make_unique(); + } + + bool check() override; + + ErrorOr> genSubTasks() override; + + private: + nebula::cpp2::ErrorCode subTask(GraphSpaceID space, PartitionID part); + + private: + std::string hdfsPath_; + std::string hdfsHost_; + int32_t hdfsPort_; + std::unique_ptr helper_; +}; + +} // namespace storage +} // namespace nebula + +#endif // STORAGE_ADMIN_DOWNLOADTASK_H_ diff --git a/src/storage/admin/FlushTask.cpp b/src/storage/admin/FlushTask.cpp index 6d9f4be9bac..6f6ff86e85e 100644 --- a/src/storage/admin/FlushTask.cpp +++ b/src/storage/admin/FlushTask.cpp @@ -10,12 +10,12 @@ namespace nebula { namespace storage { +bool FlushTask::check() { + return env_->kvstore_ != nullptr; +} + ErrorOr> FlushTask::genSubTasks() { std::vector ret; - if (!env_->kvstore_) { - return ret; - } - auto* store = dynamic_cast(env_->kvstore_); auto errOrSpace = store->space(*ctx_.parameters_.space_id_ref()); if (!ok(errOrSpace)) { diff --git a/src/storage/admin/FlushTask.h b/src/storage/admin/FlushTask.h index 243dda1e8c4..ea83a6fecf9 100644 --- a/src/storage/admin/FlushTask.h +++ b/src/storage/admin/FlushTask.h @@ -6,7 +6,6 @@ #ifndef STORAGE_ADMIN_FLUSHTASK_H_ #define STORAGE_ADMIN_FLUSHTASK_H_ -#include "common/thrift/ThriftTypes.h" #include "kvstore/KVEngine.h" #include "kvstore/NebulaStore.h" #include "storage/admin/AdminTask.h" @@ -21,6 +20,9 @@ namespace storage { class FlushTask : public AdminTask { public: FlushTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) {} + + bool check() override; + /** * @brief Generage subtasks for flushing. * diff --git a/src/storage/admin/IngestTask.cpp b/src/storage/admin/IngestTask.cpp new file mode 100644 index 00000000000..e8ce86e9801 --- /dev/null +++ b/src/storage/admin/IngestTask.cpp @@ -0,0 +1,50 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "storage/admin/IngestTask.h" + +#include "common/fs/FileUtils.h" + +namespace nebula { +namespace storage { + +bool IngestTask::check() { + return env_->kvstore_ != nullptr; +} + +ErrorOr> IngestTask::genSubTasks() { + std::vector results; + auto* store = dynamic_cast(env_->kvstore_); + auto errOrSpace = store->space(*ctx_.parameters_.space_id_ref()); + if (!ok(errOrSpace)) { + LOG(ERROR) << "Space not found"; + return error(errOrSpace); + } + + auto space = nebula::value(errOrSpace); + results.emplace_back([space = space]() { + for (auto& engine : space->engines_) { + auto parts = engine->allParts(); + for (auto part : parts) { + auto path = folly::stringPrintf("%s/download/%d", engine->getDataRoot(), part); + if (!fs::FileUtils::exist(path)) { + LOG(INFO) << path << " not existed"; + continue; + } + + auto files = nebula::fs::FileUtils::listAllFilesInDir(path.c_str(), true, "*.sst"); + auto code = engine->ingest(std::vector(files)); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + return code; + } + } + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }); + return results; +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/admin/IngestTask.h b/src/storage/admin/IngestTask.h new file mode 100644 index 00000000000..e46754ac3cb --- /dev/null +++ b/src/storage/admin/IngestTask.h @@ -0,0 +1,30 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef STORAGE_ADMIN_INGESTTASK_H_ +#define STORAGE_ADMIN_INGESTTASK_H_ + +#include "storage/admin/AdminTask.h" + +namespace nebula { +namespace storage { + +/** + * @brief Task class to handle storage ingest task. + * + */ +class IngestTask : public AdminTask { + public: + IngestTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) {} + + bool check() override; + + ErrorOr> genSubTasks() override; +}; + +} // namespace storage +} // namespace nebula + +#endif // STORAGE_ADMIN_INGESTTASK_H_ diff --git a/src/storage/admin/RebuildEdgeIndexTask.cpp b/src/storage/admin/RebuildEdgeIndexTask.cpp index 0e14e2f1068..78fa54d0286 100644 --- a/src/storage/admin/RebuildEdgeIndexTask.cpp +++ b/src/storage/admin/RebuildEdgeIndexTask.cpp @@ -48,8 +48,8 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac LOG(INFO) << "Get space edge schema failed"; return nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND; } - auto schemas = schemasRet.value(); + auto schemas = schemasRet.value(); auto vidSize = vidSizeRet.value(); std::unique_ptr iter; const auto& prefix = NebulaKeyUtils::edgePrefix(part); @@ -114,8 +114,8 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac iter->next(); continue; } - auto* schema = schemaIter->second.get(); + auto* schema = schemaIter->second.get(); auto ttlProp = CommonUtils::ttlProps(schema); if (ttlProp.first && CommonUtils::checkDataExpiredForTTL( schema, reader.get(), ttlProp.second.second, ttlProp.second.first)) { diff --git a/src/storage/admin/RebuildFTIndexTask.cpp b/src/storage/admin/RebuildFTIndexTask.cpp index bffb59b5d2b..fc8af8989ea 100644 --- a/src/storage/admin/RebuildFTIndexTask.cpp +++ b/src/storage/admin/RebuildFTIndexTask.cpp @@ -12,6 +12,10 @@ DECLARE_uint32(raft_heartbeat_interval_secs); namespace nebula { namespace storage { +bool RebuildFTIndexTask::check() { + return env_->kvstore_ != nullptr; +} + ErrorOr> RebuildFTIndexTask::genSubTasks() { std::vector tasks; VLOG(1) << "Begin rebuild fulltext indexes, space : " << *ctx_.parameters_.space_id_ref(); @@ -47,8 +51,7 @@ ErrorOr> RebuildFTIndexTask:: VLOG(1) << folly::sformat("Processing fulltext rebuild subtask, space={}, part={}", *ctx_.parameters_.space_id_ref(), part); - std::function task = - std::bind(&RebuildFTIndexTask::taskByPart, this, listener); + TaskFunction task = std::bind(&RebuildFTIndexTask::taskByPart, this, listener); tasks.emplace_back(std::move(task)); } return tasks; diff --git a/src/storage/admin/RebuildFTIndexTask.h b/src/storage/admin/RebuildFTIndexTask.h index 528e508dfaf..72835b8fd7f 100644 --- a/src/storage/admin/RebuildFTIndexTask.h +++ b/src/storage/admin/RebuildFTIndexTask.h @@ -6,7 +6,6 @@ #ifndef STORAGE_ADMIN_REBUILDFTINDEXTASK_H_ #define STORAGE_ADMIN_REBUILDFTINDEXTASK_H_ -#include "common/thrift/ThriftTypes.h" #include "kvstore/KVEngine.h" #include "kvstore/NebulaStore.h" #include "storage/admin/AdminTask.h" @@ -21,6 +20,9 @@ namespace storage { class RebuildFTIndexTask : public AdminTask { public: RebuildFTIndexTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) {} + + bool check() override; + /** * @brief Generate subtasks for rebuilding FT index. * diff --git a/src/storage/admin/RebuildIndexTask.cpp b/src/storage/admin/RebuildIndexTask.cpp index ca1988873b4..d3e883e27ba 100644 --- a/src/storage/admin/RebuildIndexTask.cpp +++ b/src/storage/admin/RebuildIndexTask.cpp @@ -14,6 +14,10 @@ namespace storage { const int32_t kReserveNum = 1024 * 4; +bool RebuildIndexTask::check() { + return env_->kvstore_ != nullptr; +} + RebuildIndexTask::RebuildIndexTask(StorageEnv* env, TaskContext&& ctx) : AdminTask(env, std::move(ctx)) { // Rebuild index rate is limited to FLAGS_rebuild_index_part_rate_limit * SubTaskConcurrency. As @@ -26,7 +30,6 @@ RebuildIndexTask::RebuildIndexTask(StorageEnv* env, TaskContext&& ctx) } ErrorOr> RebuildIndexTask::genSubTasks() { - CHECK_NOTNULL(env_->kvstore_); space_ = *ctx_.parameters_.space_id_ref(); auto parts = *ctx_.parameters_.parts_ref(); @@ -67,8 +70,7 @@ ErrorOr> RebuildIndexTask::ge for (const auto& part : parts) { env_->rebuildIndexGuard_->insert_or_assign(std::make_tuple(space_, part), IndexState::STARTING); - std::function task = - std::bind(&RebuildIndexTask::invoke, this, space_, part, items); + TaskFunction task = std::bind(&RebuildIndexTask::invoke, this, space_, part, items); tasks.emplace_back(std::move(task)); } return tasks; diff --git a/src/storage/admin/RebuildIndexTask.h b/src/storage/admin/RebuildIndexTask.h index 6c889855cb6..2f3eac072b9 100644 --- a/src/storage/admin/RebuildIndexTask.h +++ b/src/storage/admin/RebuildIndexTask.h @@ -29,6 +29,8 @@ class RebuildIndexTask : public AdminTask { LOG(INFO) << "Release Rebuild Task"; } + bool check() override; + /** * @brief Generate subtasks for rebuilding index. * diff --git a/src/storage/admin/StatsTask.cpp b/src/storage/admin/StatsTask.cpp index acb2ae89681..8008df3173d 100644 --- a/src/storage/admin/StatsTask.cpp +++ b/src/storage/admin/StatsTask.cpp @@ -14,8 +14,11 @@ namespace nebula { namespace storage { +bool StatsTask::check() { + return env_->kvstore_ != nullptr && env_->schemaMan_ != nullptr; +} + nebula::cpp2::ErrorCode StatsTask::getSchemas(GraphSpaceID spaceId) { - CHECK_NOTNULL(env_->schemaMan_); auto tags = env_->schemaMan_->getAllVerTagSchema(spaceId); if (!tags.ok()) { return nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND; @@ -61,8 +64,7 @@ ErrorOr> StatsTask::genSubTas std::vector tasks; for (const auto& part : parts) { - std::function task = - std::bind(&StatsTask::genSubTask, this, spaceId_, part, tags_, edges_); + TaskFunction task = std::bind(&StatsTask::genSubTask, this, spaceId_, part, tags_, edges_); tasks.emplace_back(std::move(task)); } return tasks; @@ -100,7 +102,6 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId, auto partitionNum = partitionNumRet.value(); LOG(INFO) << "Start stats task"; - CHECK_NOTNULL(env_->kvstore_); auto tagPrefix = NebulaKeyUtils::tagPrefix(part); std::unique_ptr tagIter; auto edgePrefix = NebulaKeyUtils::edgePrefix(part); diff --git a/src/storage/admin/StatsTask.h b/src/storage/admin/StatsTask.h index 9de607e151e..5b618fc0d63 100644 --- a/src/storage/admin/StatsTask.h +++ b/src/storage/admin/StatsTask.h @@ -6,7 +6,6 @@ #ifndef STORAGE_ADMIN_STATSTASK_H_ #define STORAGE_ADMIN_STATSTASK_H_ -#include "common/thrift/ThriftTypes.h" #include "interface/gen-cpp2/meta_types.h" #include "kvstore/KVEngine.h" #include "kvstore/NebulaStore.h" @@ -28,6 +27,8 @@ class StatsTask : public AdminTask { LOG(INFO) << "Release Stats Task"; } + bool check() override; + /** * @brief Generate sub tasks for StatsTask. * diff --git a/src/storage/admin/StopAdminTaskProcessor.h b/src/storage/admin/StopAdminTaskProcessor.h index b8363283916..b3418e50041 100644 --- a/src/storage/admin/StopAdminTaskProcessor.h +++ b/src/storage/admin/StopAdminTaskProcessor.h @@ -7,7 +7,6 @@ #define STORAGE_ADMIN_STOPADMINTASKPROCESSOR_H_ #include "common/base/Base.h" -#include "common/thrift/ThriftTypes.h" #include "kvstore/NebulaStore.h" #include "storage/BaseProcessor.h" #include "storage/StorageFlags.h" diff --git a/src/storage/http/StorageHttpDownloadHandler.cpp b/src/storage/http/StorageHttpDownloadHandler.cpp deleted file mode 100644 index ed15bc77f50..00000000000 --- a/src/storage/http/StorageHttpDownloadHandler.cpp +++ /dev/null @@ -1,197 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "storage/http/StorageHttpDownloadHandler.h" - -#include -#include -#include - -#include "common/fs/FileUtils.h" -#include "common/hdfs/HdfsHelper.h" -#include "common/process/ProcessUtils.h" -#include "kvstore/Part.h" -#include "webservice/Common.h" - -DEFINE_int32(download_thread_num, 3, "download thread number"); - -namespace nebula { -namespace storage { - -using proxygen::HTTPMessage; -using proxygen::HTTPMethod; -using proxygen::ProxygenError; -using proxygen::ResponseBuilder; -using proxygen::UpgradeProtocol; - -void StorageHttpDownloadHandler::init(nebula::hdfs::HdfsHelper* helper, - nebula::thread::GenericThreadPool* pool, - nebula::kvstore::KVStore* kvstore, - std::vector paths) { - helper_ = helper; - pool_ = pool; - kvstore_ = kvstore; - paths_ = paths; - CHECK_NOTNULL(helper_); - CHECK_NOTNULL(pool_); - CHECK_NOTNULL(kvstore_); - CHECK(!paths_.empty()); -} - -void StorageHttpDownloadHandler::onRequest(std::unique_ptr headers) noexcept { - if (headers->getMethod().value() != HTTPMethod::GET) { - // Unsupported method - err_ = HttpCode::E_UNSUPPORTED_METHOD; - return; - } - - if (!headers->hasQueryParam("host") || !headers->hasQueryParam("port") || - !headers->hasQueryParam("path") || !headers->hasQueryParam("parts") || - !headers->hasQueryParam("space")) { - LOG(ERROR) << "Illegal Argument"; - err_ = HttpCode::E_ILLEGAL_ARGUMENT; - return; - } - - hdfsHost_ = headers->getQueryParam("host"); - hdfsPort_ = headers->getIntQueryParam("port"); - hdfsPath_ = headers->getQueryParam("path"); - partitions_ = headers->getQueryParam("parts"); - spaceID_ = headers->getIntQueryParam("space"); - - for (auto& path : paths_) { - auto downloadPath = folly::stringPrintf("%s/nebula/%d/download", path.c_str(), spaceID_); - if (fs::FileUtils::fileType(downloadPath.c_str()) == fs::FileType::NOTEXIST) { - fs::FileUtils::makeDir(downloadPath); - } - } -} - -void StorageHttpDownloadHandler::onBody(std::unique_ptr) noexcept { - // Do nothing, we only support GET -} - -void StorageHttpDownloadHandler::onEOM() noexcept { - switch (err_) { - case HttpCode::E_UNSUPPORTED_METHOD: - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), - WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) - .sendWithEOM(); - return; - case HttpCode::E_ILLEGAL_ARGUMENT: - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::BAD_REQUEST), - WebServiceUtils::toString(HttpStatusCode::BAD_REQUEST)) - .sendWithEOM(); - return; - default: - break; - } - - if (helper_->checkHadoopPath()) { - std::vector parts; - folly::split(",", partitions_, parts, true); - if (parts.empty()) { - ResponseBuilder(downstream_) - .status(400, "SSTFile download failed") - .body("Partitions should be not empty") - .sendWithEOM(); - } - - if (downloadSSTFiles(hdfsHost_, hdfsPort_, hdfsPath_, parts)) { - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::OK), - WebServiceUtils::toString(HttpStatusCode::OK)) - .body("SSTFile download successfully") - .sendWithEOM(); - } else { - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::FORBIDDEN), - WebServiceUtils::toString(HttpStatusCode::FORBIDDEN)) - .body("SSTFile download failed") - .sendWithEOM(); - } - } else { - LOG(ERROR) << "HADOOP_HOME not exist"; - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::NOT_FOUND), - WebServiceUtils::toString(HttpStatusCode::NOT_FOUND)) - .sendWithEOM(); - } -} - -void StorageHttpDownloadHandler::onUpgrade(UpgradeProtocol) noexcept { - // Do nothing -} - -void StorageHttpDownloadHandler::requestComplete() noexcept { - delete this; -} - -void StorageHttpDownloadHandler::onError(ProxygenError error) noexcept { - LOG(ERROR) << "Web Service StorageHttpDownloadHandler got error: " - << proxygen::getErrorString(error); -} - -bool StorageHttpDownloadHandler::downloadSSTFiles(const std::string& hdfsHost, - int32_t hdfsPort, - const std::string& hdfsPath, - const std::vector& parts) { - static std::atomic_flag isRunning = ATOMIC_FLAG_INIT; - if (isRunning.test_and_set()) { - LOG(ERROR) << "Download is not completed"; - return false; - } - - std::vector> futures; - - for (auto& part : parts) { - PartitionID partId; - try { - partId = folly::to(part); - } catch (const std::exception& ex) { - isRunning.clear(); - LOG(ERROR) << "Invalid part: \"" << part << "\""; - return false; - } - - auto downloader = [hdfsHost, hdfsPort, hdfsPath, partId, this]() { - auto hdfsPartPath = folly::stringPrintf("%s/%d", hdfsPath.c_str(), partId); - auto partResult = kvstore_->part(spaceID_, partId); - if (!ok(partResult)) { - LOG(ERROR) << "Can't found space: " << spaceID_ << ", part: " << partId; - return false; - } - - auto localPath = - folly::stringPrintf("%s/download/", value(partResult)->engine()->getDataRoot()); - auto result = this->helper_->copyToLocal(hdfsHost, hdfsPort, hdfsPartPath, localPath); - return result.ok() && result.value().empty(); - }; - auto future = pool_->addTask(downloader); - futures.push_back(std::move(future)); - } - - bool successfully{true}; - auto tries = folly::collectAll(futures).get(); - for (const auto& t : tries) { - if (t.hasException()) { - LOG(ERROR) << "Download Failed: " << t.exception(); - successfully = false; - break; - } - if (!t.value()) { - successfully = false; - break; - } - } - LOG(INFO) << "Download tasks have finished"; - isRunning.clear(); - return successfully; -} - -} // namespace storage -} // namespace nebula diff --git a/src/storage/http/StorageHttpDownloadHandler.h b/src/storage/http/StorageHttpDownloadHandler.h deleted file mode 100644 index 1a4d8656d22..00000000000 --- a/src/storage/http/StorageHttpDownloadHandler.h +++ /dev/null @@ -1,69 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef STORAGE_HTTP_STORAGEHTTPDOWNLOADHANDLER_H_ -#define STORAGE_HTTP_STORAGEHTTPDOWNLOADHANDLER_H_ - -#include - -#include "common/base/Base.h" -#include "common/hdfs/HdfsHelper.h" -#include "common/thread/GenericThreadPool.h" -#include "kvstore/KVStore.h" -#include "webservice/Common.h" - -namespace nebula { -namespace storage { - -using nebula::HttpCode; - -/** - * @brief down load files from hdfs - * - */ -class StorageHttpDownloadHandler : public proxygen::RequestHandler { - public: - StorageHttpDownloadHandler() = default; - - void init(nebula::hdfs::HdfsHelper *helper, - nebula::thread::GenericThreadPool *pool, - nebula::kvstore::KVStore *kvstore, - std::vector paths); - - void onRequest(std::unique_ptr headers) noexcept override; - - void onBody(std::unique_ptr body) noexcept override; - - void onEOM() noexcept override; - - void onUpgrade(proxygen::UpgradeProtocol protocol) noexcept override; - - void requestComplete() noexcept override; - - void onError(proxygen::ProxygenError error) noexcept override; - - private: - bool downloadSSTFiles(const std::string &url, - int port, - const std::string &path, - const std::vector &parts); - - private: - HttpCode err_{HttpCode::SUCCEEDED}; - GraphSpaceID spaceID_; - std::string hdfsHost_; - int32_t hdfsPort_; - std::string hdfsPath_; - std::string partitions_; - nebula::hdfs::HdfsHelper *helper_; - nebula::thread::GenericThreadPool *pool_; - nebula::kvstore::KVStore *kvstore_; - std::vector paths_; -}; - -} // namespace storage -} // namespace nebula - -#endif // STORAGE_HTTP_STORAGEHTTPDOWNLOADHANDLER_H_ diff --git a/src/storage/http/StorageHttpIngestHandler.cpp b/src/storage/http/StorageHttpIngestHandler.cpp deleted file mode 100644 index 8c44cf4bf72..00000000000 --- a/src/storage/http/StorageHttpIngestHandler.cpp +++ /dev/null @@ -1,103 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "storage/http/StorageHttpIngestHandler.h" - -#include -#include -#include - -namespace nebula { -namespace storage { - -using proxygen::HTTPMessage; -using proxygen::HTTPMethod; -using proxygen::ProxygenError; -using proxygen::ResponseBuilder; -using proxygen::UpgradeProtocol; - -void StorageHttpIngestHandler::init(nebula::kvstore::KVStore *kvstore) { - kvstore_ = kvstore; - CHECK_NOTNULL(kvstore_); -} - -void StorageHttpIngestHandler::onRequest(std::unique_ptr headers) noexcept { - if (headers->getMethod().value() != HTTPMethod::GET) { - // Unsupported method - err_ = HttpCode::E_UNSUPPORTED_METHOD; - return; - } - - if (!headers->hasQueryParam("space")) { - err_ = HttpCode::E_ILLEGAL_ARGUMENT; - return; - } - - space_ = headers->getIntQueryParam("space"); -} - -void StorageHttpIngestHandler::onBody(std::unique_ptr) noexcept { - // Do nothing, we only support GET -} - -void StorageHttpIngestHandler::onEOM() noexcept { - switch (err_) { - case HttpCode::E_UNSUPPORTED_METHOD: - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), - WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) - .sendWithEOM(); - return; - case HttpCode::E_ILLEGAL_ARGUMENT: - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::BAD_REQUEST), - WebServiceUtils::toString(HttpStatusCode::BAD_REQUEST)) - .sendWithEOM(); - return; - default: - break; - } - - if (ingestSSTFiles(space_)) { - LOG(ERROR) << "SSTFile ingest successfully "; - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::OK), - WebServiceUtils::toString(HttpStatusCode::OK)) - .body("SSTFile ingest successfully") - .sendWithEOM(); - } else { - LOG(ERROR) << "SSTFile ingest failed"; - ResponseBuilder(downstream_) - .status(WebServiceUtils::to(HttpStatusCode::FORBIDDEN), - WebServiceUtils::toString(HttpStatusCode::FORBIDDEN)) - .body("SSTFile ingest failed") - .sendWithEOM(); - } -} - -void StorageHttpIngestHandler::onUpgrade(UpgradeProtocol) noexcept { - // Do nothing -} - -void StorageHttpIngestHandler::requestComplete() noexcept { - delete this; -} - -void StorageHttpIngestHandler::onError(ProxygenError error) noexcept { - LOG(ERROR) << "Web Service MetaHttpIngestHandler Failed: " << proxygen::getErrorString(error); -} - -bool StorageHttpIngestHandler::ingestSSTFiles(GraphSpaceID space) { - auto code = kvstore_->ingest(space); - if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { - return true; - } else { - LOG(ERROR) << "SSTFile Ingest Failed: " << apache::thrift::util::enumNameSafe(code); - return false; - } -} - -} // namespace storage -} // namespace nebula diff --git a/src/storage/http/StorageHttpIngestHandler.h b/src/storage/http/StorageHttpIngestHandler.h deleted file mode 100644 index b3beebbb846..00000000000 --- a/src/storage/http/StorageHttpIngestHandler.h +++ /dev/null @@ -1,53 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef STORAGE_HTTP_STORAGEHTTPINGESTHANDLER_H_ -#define STORAGE_HTTP_STORAGEHTTPINGESTHANDLER_H_ - -#include - -#include "common/base/Base.h" -#include "kvstore/KVStore.h" -#include "webservice/Common.h" - -namespace nebula { -namespace storage { - -using nebula::HttpCode; - -/** - * @brief ingest from from downloaded file. - * - */ -class StorageHttpIngestHandler : public proxygen::RequestHandler { - public: - StorageHttpIngestHandler() = default; - - void init(nebula::kvstore::KVStore *kvstore); - - void onRequest(std::unique_ptr headers) noexcept override; - - void onBody(std::unique_ptr body) noexcept override; - - void onEOM() noexcept override; - - void onUpgrade(proxygen::UpgradeProtocol protocol) noexcept override; - - void requestComplete() noexcept override; - - void onError(proxygen::ProxygenError error) noexcept override; - - bool ingestSSTFiles(GraphSpaceID space); - - private: - HttpCode err_{HttpCode::SUCCEEDED}; - nebula::kvstore::KVStore *kvstore_; - GraphSpaceID space_; -}; - -} // namespace storage -} // namespace nebula - -#endif // STORAGE_HTTP_STORAGEHTTPINGESTHANDLER_H_ diff --git a/src/storage/test/AdminTaskManagerTest.cpp b/src/storage/test/AdminTaskManagerTest.cpp index 7173eaee892..a55590ea8ed 100644 --- a/src/storage/test/AdminTaskManagerTest.cpp +++ b/src/storage/test/AdminTaskManagerTest.cpp @@ -42,12 +42,17 @@ struct HookableTask : public AdminTask { HookableTask() { fGenSubTasks = [&]() { return subTasks; }; } + ErrOrSubTasks genSubTasks() override { LOG(INFO) << "HookableTask::genSubTasks() subTasks.size()=" << subTasks.size(); return fGenSubTasks(); } - void addSubTask(std::function subTask) { + bool check() override { + return true; + } + + void addSubTask(TaskFunction subTask) { subTasks.emplace_back(subTask); } diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 5810bf61560..c6923bf1fcb 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -1,4 +1,5 @@ set(storage_test_deps + $ $ $ $ @@ -461,40 +462,6 @@ nebula_add_test( gtest ) -nebula_add_test( - NAME - storage_http_download_test - SOURCES - StorageHttpDownloadHandlerTest.cpp - OBJECTS - $ - $ - ${storage_test_deps} - LIBRARIES - ${ROCKSDB_LIBRARIES} - ${THRIFT_LIBRARIES} - ${PROXYGEN_LIBRARIES} - wangle - gtest -) - -nebula_add_test( - NAME - storage_http_ingest_test - SOURCES - StorageHttpIngestHandlerTest.cpp - OBJECTS - $ - $ - ${storage_test_deps} - LIBRARIES - ${ROCKSDB_LIBRARIES} - ${THRIFT_LIBRARIES} - ${PROXYGEN_LIBRARIES} - wangle - gtest -) - nebula_add_test( NAME storage_http_property_test @@ -769,4 +736,3 @@ nebula_add_executable( wangle gtest ) - diff --git a/src/storage/test/StorageHttpDownloadHandlerTest.cpp b/src/storage/test/StorageHttpDownloadHandlerTest.cpp deleted file mode 100644 index ec88273affc..00000000000 --- a/src/storage/test/StorageHttpDownloadHandlerTest.cpp +++ /dev/null @@ -1,116 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include - -#include "common/base/Base.h" -#include "common/fs/TempDir.h" -#include "common/http/HttpClient.h" -#include "mock/MockCluster.h" -#include "mock/MockData.h" -#include "storage/http/StorageHttpDownloadHandler.h" -#include "storage/test/MockHdfsHelper.h" -#include "storage/test/TestUtils.h" -#include "webservice/Router.h" -#include "webservice/WebService.h" - -DECLARE_string(meta_server_addrs); - -namespace nebula { -namespace storage { - -std::unique_ptr helper = std::make_unique(); - -class StorageHttpDownloadHandlerTestEnv : public ::testing::Environment { - public: - void SetUp() override { - FLAGS_ws_ip = "127.0.0.1"; - FLAGS_ws_http_port = 0; - - rootPath_ = std::make_unique("/tmp/StorageHttpDownloadHandler.XXXXXX"); - cluster_ = std::make_unique(); - cluster_->initStorageKV(rootPath_->path()); - - pool_ = std::make_unique(); - pool_->start(1); - - VLOG(1) << "Starting web service..."; - webSvc_ = std::make_unique(); - auto& router = webSvc_->router(); - router.get("/download").handler([this](nebula::web::PathParams&&) { - auto handler = new storage::StorageHttpDownloadHandler(); - std::vector paths{rootPath_->path()}; - handler->init(helper.get(), pool_.get(), cluster_->storageEnv_->kvstore_, paths); - return handler; - }); - auto status = webSvc_->start(); - ASSERT_TRUE(status.ok()) << status; - } - - void TearDown() override { - cluster_.reset(); - rootPath_.reset(); - webSvc_.reset(); - pool_->stop(); - VLOG(1) << "Web service stopped"; - } - - private: - std::unique_ptr cluster_; - std::unique_ptr webSvc_; - std::unique_ptr rootPath_; - std::unique_ptr pool_; -}; - -TEST(StorageHttpDownloadHandlerTest, StorageDownloadTest) { - { - auto url = "/download"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_TRUE(resp.value().empty()); - } - { - auto url = "/download?host=127.0.0.1&port=9000&path=/data&parts=1&space=1"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile download successfully", resp.value()); - } - { - auto url = - "/download?host=127.0.0.1&port=9000&path=/" - "data&parts=illegal-part&space=1"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile download failed", resp.value()); - } - { - helper = std::make_unique(); - auto url = "/download?host=127.0.0.1&port=9000&path=/data&parts=1&space=1"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile download failed", resp.value()); - } -} - -} // namespace storage -} // namespace nebula - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - - ::testing::AddGlobalTestEnvironment(new nebula::storage::StorageHttpDownloadHandlerTestEnv()); - - return RUN_ALL_TESTS(); -} diff --git a/src/storage/test/StorageHttpIngestHandlerTest.cpp b/src/storage/test/StorageHttpIngestHandlerTest.cpp deleted file mode 100644 index 0c18c19741e..00000000000 --- a/src/storage/test/StorageHttpIngestHandlerTest.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include -#include - -#include "common/base/Base.h" -#include "common/fs/TempDir.h" -#include "common/http/HttpClient.h" -#include "mock/MockCluster.h" -#include "mock/MockData.h" -#include "storage/http/StorageHttpIngestHandler.h" -#include "webservice/Router.h" -#include "webservice/WebService.h" - -namespace nebula { -namespace storage { - -class StorageHttpIngestHandlerTestEnv : public ::testing::Environment { - public: - void SetUp() override { - FLAGS_ws_ip = "127.0.0.1"; - FLAGS_ws_http_port = 0; - VLOG(1) << "Starting web service..."; - - rootPath_ = std::make_unique("/tmp/StorageHttpIngestHandler.XXXXXX"); - cluster_ = std::make_unique(); - cluster_->initStorageKV(rootPath_->path()); - - auto partPath = folly::stringPrintf("%s/disk1/nebula/0/download/0", rootPath_->path()); - ASSERT_TRUE(nebula::fs::FileUtils::makeDir(partPath)); - - auto options = rocksdb::Options(); - auto env = rocksdb::EnvOptions(); - rocksdb::SstFileWriter writer{env, options}; - - auto sstPath = folly::stringPrintf("%s/data.sst", partPath.c_str()); - auto status = writer.Open(sstPath); - ASSERT_EQ(rocksdb::Status::OK(), status); - - for (auto i = 0; i < 10; i++) { - status = writer.Put(folly::stringPrintf("key_%d", i), folly::stringPrintf("val_%d", i)); - ASSERT_EQ(rocksdb::Status::OK(), status); - } - status = writer.Finish(); - ASSERT_EQ(rocksdb::Status::OK(), status); - - webSvc_ = std::make_unique(); - auto& router = webSvc_->router(); - router.get("/ingest").handler([this](nebula::web::PathParams&&) { - auto handler = new storage::StorageHttpIngestHandler(); - handler->init(cluster_->storageEnv_->kvstore_); - return handler; - }); - auto webStatus = webSvc_->start(); - ASSERT_TRUE(webStatus.ok()) << webStatus; - } - - void TearDown() override { - cluster_.reset(); - rootPath_.reset(); - webSvc_.reset(); - VLOG(1) << "Web service stopped"; - } - - private: - std::unique_ptr rootPath_; - std::unique_ptr cluster_; - std::unique_ptr webSvc_; -}; - -TEST(StorageHttpIngestHandlerTest, StorageIngestTest) { - { - auto url = "/ingest?space=1"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile ingest successfully", resp.value()); - } - { - auto url = "/ingest?space=0"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile ingest failed", resp.value()); - } -} - -} // namespace storage -} // namespace nebula - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - - ::testing::AddGlobalTestEnvironment(new nebula::storage::StorageHttpIngestHandlerTestEnv()); - - return RUN_ALL_TESTS(); -} diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index efc6bdb4d52..89a20559aed 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -1,5 +1,63 @@ # note: standalone version don't have dependent test tools for now. +set(tools_test_deps + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ +) + if(NOT ENABLE_STANDALONE_VERSION) nebula_add_subdirectory(storage-perf) nebula_add_subdirectory(simple-kv-verify) diff --git a/src/tools/db-dump/CMakeLists.txt b/src/tools/db-dump/CMakeLists.txt index 66084aa6ef3..62f9432c1fa 100644 --- a/src/tools/db-dump/CMakeLists.txt +++ b/src/tools/db-dump/CMakeLists.txt @@ -1,61 +1,3 @@ -set(tools_test_deps - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ -) - nebula_add_executable( NAME db_dump diff --git a/src/tools/db-upgrade/CMakeLists.txt b/src/tools/db-upgrade/CMakeLists.txt index 29ea63b0755..af0b1efbd26 100644 --- a/src/tools/db-upgrade/CMakeLists.txt +++ b/src/tools/db-upgrade/CMakeLists.txt @@ -8,60 +8,7 @@ nebula_add_executable( NebulaKeyUtilsV3.cpp DbUpgrader.cpp OBJECTS - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ + ${tools_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} diff --git a/src/tools/meta-dump/CMakeLists.txt b/src/tools/meta-dump/CMakeLists.txt index fbbe2c4b30f..47fbcc1be56 100644 --- a/src/tools/meta-dump/CMakeLists.txt +++ b/src/tools/meta-dump/CMakeLists.txt @@ -4,61 +4,7 @@ nebula_add_executable( SOURCES MetaDumpTool.cpp OBJECTS - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ + ${tools_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} diff --git a/src/tools/simple-kv-verify/CMakeLists.txt b/src/tools/simple-kv-verify/CMakeLists.txt index 1d752a3f671..5b5d3bdf220 100644 --- a/src/tools/simple-kv-verify/CMakeLists.txt +++ b/src/tools/simple-kv-verify/CMakeLists.txt @@ -4,59 +4,7 @@ nebula_add_executable( SOURCES SimpleKVVerifyTool.cpp OBJECTS - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ + ${tools_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} diff --git a/src/tools/storage-perf/CMakeLists.txt b/src/tools/storage-perf/CMakeLists.txt index 2e85f342958..cccddaaadb2 100644 --- a/src/tools/storage-perf/CMakeLists.txt +++ b/src/tools/storage-perf/CMakeLists.txt @@ -1,67 +1,10 @@ -set(perf_test_deps - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ -) - nebula_add_executable( NAME storage_perf SOURCES StoragePerfTool.cpp OBJECTS - ${perf_test_deps} + ${tools_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} @@ -84,7 +27,7 @@ nebula_add_executable( SOURCES StorageIntegrityTool.cpp OBJECTS - ${perf_test_deps} + ${tools_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES}