Skip to content

Commit

Permalink
download and ingest job
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Mar 26, 2022
1 parent 5d96c60 commit b3c8a70
Show file tree
Hide file tree
Showing 85 changed files with 740 additions and 2,137 deletions.
4 changes: 1 addition & 3 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/RootUserMan.h"
#include "meta/http/MetaHttpDownloadHandler.h"
#include "meta/http/MetaHttpIngestHandler.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand Down Expand Up @@ -152,7 +150,7 @@ int main(int argc, char* argv[]) {
pool->start(FLAGS_meta_http_thread_num, "http thread pool");

auto webSvc = std::make_unique<nebula::WebService>();
status = initWebService(webSvc.get(), gKVStore.get(), helper.get(), pool.get());
status = initWebService(webSvc.get(), gKVStore.get());
if (!status.ok()) {
LOG(ERROR) << "Init web service failed: " << status;
return EXIT_FAILURE;
Expand Down
17 changes: 1 addition & 16 deletions src/daemons/MetaDaemonInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
#include "meta/KVBasedClusterIdMan.h"
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/http/MetaHttpDownloadHandler.h"
#include "meta/http/MetaHttpIngestHandler.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand Down Expand Up @@ -160,22 +158,9 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
return kvstore;
}

nebula::Status initWebService(nebula::WebService* svc,
nebula::kvstore::KVStore* kvstore,
nebula::hdfs::HdfsCommandHelper* helper,
nebula::thread::GenericThreadPool* pool) {
nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore) {
LOG(INFO) << "Starting Meta HTTP Service";
auto& router = svc->router();
router.get("/download-dispatch").handler([kvstore, helper, pool](PathParams&&) {
auto handler = new nebula::meta::MetaHttpDownloadHandler();
handler->init(kvstore, helper, pool);
return handler;
});
router.get("/ingest-dispatch").handler([kvstore, pool](PathParams&&) {
auto handler = new nebula::meta::MetaHttpIngestHandler();
handler->init(kvstore, pool);
return handler;
});
router.get("/replace").handler([kvstore](PathParams&&) {
auto handler = new nebula::meta::MetaHttpReplaceHostHandler();
handler->init(kvstore);
Expand Down
5 changes: 1 addition & 4 deletions src/daemons/MetaDaemonInit.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,5 @@ nebula::ClusterID& metaClusterId();
std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> peers,
nebula::HostAddr localhost);

nebula::Status initWebService(nebula::WebService* svc,
nebula::kvstore::KVStore* kvstore,
nebula::hdfs::HdfsCommandHelper* helper,
nebula::thread::GenericThreadPool* pool);
nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore);
#endif
7 changes: 1 addition & 6 deletions src/daemons/StandAloneDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/RootUserMan.h"
#include "meta/http/MetaHttpDownloadHandler.h"
#include "meta/http/MetaHttpIngestHandler.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand Down Expand Up @@ -209,11 +207,8 @@ int main(int argc, char *argv[]) {
return;
}
LOG(INFO) << "Start http service";
auto helper = std::make_unique<nebula::hdfs::HdfsCommandHelper>();
auto pool = std::make_unique<nebula::thread::GenericThreadPool>();
pool->start(FLAGS_meta_http_thread_num, "http thread pool");
auto webSvc = std::make_unique<nebula::WebService>();
status = initWebService(webSvc.get(), gMetaKVStore.get(), helper.get(), pool.get());
status = initWebService(webSvc.get(), gMetaKVStore.get());
if (!status.ok()) {
LOG(ERROR) << "Init web service failed: " << status;
return;
Expand Down
2 changes: 0 additions & 2 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ nebula_add_library(
admin/PartExecutor.cpp
admin/CharsetExecutor.cpp
admin/ShowStatsExecutor.cpp
admin/DownloadExecutor.cpp
admin/IngestExecutor.cpp
admin/ConfigExecutor.cpp
admin/ZoneExecutor.cpp
admin/ShowServiceClientsExecutor.cpp
Expand Down
8 changes: 0 additions & 8 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
#include "graph/executor/admin/ConfigExecutor.h"
#include "graph/executor/admin/CreateUserExecutor.h"
#include "graph/executor/admin/DescribeUserExecutor.h"
#include "graph/executor/admin/DownloadExecutor.h"
#include "graph/executor/admin/DropHostsExecutor.h"
#include "graph/executor/admin/DropUserExecutor.h"
#include "graph/executor/admin/GrantRoleExecutor.h"
#include "graph/executor/admin/IngestExecutor.h"
#include "graph/executor/admin/KillQueryExecutor.h"
#include "graph/executor/admin/ListRolesExecutor.h"
#include "graph/executor/admin/ListUserRolesExecutor.h"
Expand Down Expand Up @@ -518,12 +516,6 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kSignOutService: {
return pool->add(new SignOutServiceExecutor(node, qctx));
}
case PlanNode::Kind::kDownload: {
return pool->add(new DownloadExecutor(node, qctx));
}
case PlanNode::Kind::kIngest: {
return pool->add(new IngestExecutor(node, qctx));
}
case PlanNode::Kind::kShowSessions: {
return pool->add(new ShowSessionsExecutor(node, qctx));
}
Expand Down
33 changes: 0 additions & 33 deletions src/graph/executor/admin/DownloadExecutor.cpp

This file was deleted.

25 changes: 0 additions & 25 deletions src/graph/executor/admin/DownloadExecutor.h

This file was deleted.

28 changes: 0 additions & 28 deletions src/graph/executor/admin/IngestExecutor.cpp

This file was deleted.

25 changes: 0 additions & 25 deletions src/graph/executor/admin/IngestExecutor.h

This file was deleted.

49 changes: 0 additions & 49 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,55 +537,6 @@ class ShowListener final : public SingleDependencyNode {
: SingleDependencyNode(qctx, Kind::kShowListener, input) {}
};

class Download final : public SingleDependencyNode {
public:
static Download* make(QueryContext* qctx,
PlanNode* input,
std::string hdfsHost,
int32_t hdfsPort,
std::string hdfsPath) {
return qctx->objPool()->add(new Download(qctx, input, hdfsHost, hdfsPort, hdfsPath));
}

const std::string& getHdfsHost() const {
return hdfsHost_;
}

int32_t getHdfsPort() const {
return hdfsPort_;
}

const std::string& getHdfsPath() const {
return hdfsPath_;
}

private:
Download(QueryContext* qctx,
PlanNode* dep,
std::string hdfsHost,
int32_t hdfsPort,
std::string hdfsPath)
: SingleDependencyNode(qctx, Kind::kDownload, dep),
hdfsHost_(hdfsHost),
hdfsPort_(hdfsPort),
hdfsPath_(hdfsPath) {}

private:
std::string hdfsHost_;
int32_t hdfsPort_;
std::string hdfsPath_;
};

class Ingest final : public SingleDependencyNode {
public:
static Ingest* make(QueryContext* qctx, PlanNode* dep) {
return qctx->objPool()->add(new Ingest(qctx, dep));
}

private:
Ingest(QueryContext* qctx, PlanNode* dep) : SingleDependencyNode(qctx, Kind::kIngest, dep) {}
};

// User related Node
class CreateUser final : public CreateNode {
public:
Expand Down
4 changes: 0 additions & 4 deletions src/graph/planner/plan/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,6 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "SignInService";
case Kind::kSignOutService:
return "SignOutService";
case Kind::kDownload:
return "Download";
case Kind::kIngest:
return "Ingest";
case Kind::kShowSessions:
return "ShowSessions";
case Kind::kUpdateSession:
Expand Down
2 changes: 0 additions & 2 deletions src/graph/planner/plan/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ class PlanNode {
kShowFTIndexes,
kSignInService,
kSignOutService,
kDownload,
kIngest,
kShowSessions,
kUpdateSession,

Expand Down
4 changes: 1 addition & 3 deletions src/graph/service/PermissionCheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace graph {
/**
* Read space : kUse, kDescribeSpace
* Write space : kCreateSpace, kDropSpace, kClearSpace, kCreateSnapshot,
* kDropSnapshot, kBalance, kAdmin, kConfig, kIngest, kDownload
* kDropSnapshot, kBalance, kAdmin, kConfig
* Read schema : kDescribeTag, kDescribeEdge,
* kDescribeTagIndex, kDescribeEdgeIndex
* Write schema : kCreateTag, kAlterTag, kCreateEdge,
Expand Down Expand Up @@ -68,8 +68,6 @@ namespace graph {
case Sentence::Kind::kShowConfigs:
case Sentence::Kind::kSetConfig:
case Sentence::Kind::kGetConfig:
case Sentence::Kind::kIngest:
case Sentence::Kind::kDownload:
case Sentence::Kind::kSignOutService:
case Sentence::Kind::kSignInService: {
return PermissionManager::canWriteSpace(session);
Expand Down
6 changes: 2 additions & 4 deletions src/graph/validator/AdminJobValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ class AdminJobValidator final : public Validator {
case meta::cpp2::JobType::STATS:
case meta::cpp2::JobType::COMPACT:
case meta::cpp2::JobType::FLUSH:
case meta::cpp2::JobType::DOWNLOAD:
case meta::cpp2::JobType::INGEST:
case meta::cpp2::JobType::DATA_BALANCE:
case meta::cpp2::JobType::LEADER_BALANCE:
case meta::cpp2::JobType::ZONE_BALANCE:
return true;
// TODO: download and ingest need to be refactored to use the rpc protocol.
// Currently they are using their own validator
case meta::cpp2::JobType::DOWNLOAD:
case meta::cpp2::JobType::INGEST:
case meta::cpp2::JobType::UNKNOWN:
return false;
}
Expand Down
2 changes: 0 additions & 2 deletions src/graph/validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ nebula_add_library(
FindPathValidator.cpp
LookupValidator.cpp
MatchValidator.cpp
DownloadValidator.cpp
IngestValidator.cpp
)

nebula_add_subdirectory(test)
30 changes: 0 additions & 30 deletions src/graph/validator/DownloadValidator.cpp

This file was deleted.

Loading

0 comments on commit b3c8a70

Please sign in to comment.