Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
bright-starry-sky committed May 13, 2021
1 parent 8f47888 commit 5dcd4ef
Show file tree
Hide file tree
Showing 20 changed files with 1,011 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ nebula_add_library(
processors/indexMan/GetEdgeIndexProcessor.cpp
processors/indexMan/ListEdgeIndexesProcessor.cpp
processors/indexMan/FTServiceProcessor.cpp
processors/indexMan/FTIndexProcessor.cpp
processors/customKV/GetProcessor.cpp
processors/customKV/MultiGetProcessor.cpp
processors/customKV/MultiPutProcessor.cpp
Expand Down
19 changes: 19 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "meta/processors/indexMan/GetEdgeIndexProcessor.h"
#include "meta/processors/indexMan/ListEdgeIndexesProcessor.h"
#include "meta/processors/indexMan/FTServiceProcessor.h"
#include "meta/processors/indexMan/FTIndexProcessor.h"
#include "meta/processors/customKV/MultiPutProcessor.h"
#include "meta/processors/customKV/GetProcessor.h"
#include "meta/processors/customKV/MultiGetProcessor.h"
Expand Down Expand Up @@ -305,6 +306,24 @@ MetaServiceHandler::future_listFTClients(const cpp2::ListFTClientsReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_createFTIndex(const cpp2::CreateFTIndexReq& req) {
auto* processor = CreateFTIndexProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_dropFTIndex(const cpp2::DropFTIndexReq& req) {
auto* processor = DropFTIndexProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ListFTIndexesResp>
MetaServiceHandler::future_listFTIndexes(const cpp2::ListFTIndexesReq& req) {
auto* processor = ListFTIndexesProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::HBResp>
MetaServiceHandler::future_heartBeat(const cpp2::HBReq& req) {
auto* processor = HBProcessor::instance(kvstore_, &kHBCounters, clusterId_);
Expand Down
9 changes: 9 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListFTClientsResp>
future_listFTClients(const cpp2::ListFTClientsReq& req) override;

folly::Future<cpp2::ExecResp>
future_createFTIndex(const cpp2::CreateFTIndexReq& req) override;

folly::Future<cpp2::ExecResp>
future_dropFTIndex(const cpp2::DropFTIndexReq& req) override;

folly::Future<cpp2::ListFTIndexesResp>
future_listFTIndexes(const cpp2::ListFTIndexesReq& req) override;

/**
* User manager
**/
Expand Down
29 changes: 29 additions & 0 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1397,5 +1397,34 @@ meta::cpp2::Session MetaServiceUtils::parseSessionVal(const folly::StringPiece &
apache::thrift::CompactSerializer::deserialize(val, session);
return session;
}

std::string MetaServiceUtils::fulltextIndexKey(const std::string& indexName) {
std::string key;
key.reserve(kFTIndexTable.size() + indexName.size());
key.append(kFTIndexTable.data(), kFTIndexTable.size())
.append(indexName);
return key;
}

std::string MetaServiceUtils::fulltextIndexVal(const cpp2::FTIndex& index) {
std::string val;
apache::thrift::CompactSerializer::serialize(index, &val);
return val;
}

std::string MetaServiceUtils::parsefulltextIndexName(folly::StringPiece key) {
return key.subpiece(kFTIndexTable.size(), key.size()).toString();
}

cpp2::FTIndex MetaServiceUtils::parsefulltextIndex(folly::StringPiece val) {
cpp2::FTIndex ftIndex;
apache::thrift::CompactSerializer::deserialize(val, ftIndex);
return ftIndex;
}

std::string MetaServiceUtils::fulltextIndexPrefix() {
return kFTIndexTable;
}

} // namespace meta
} // namespace nebula
10 changes: 10 additions & 0 deletions src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,16 @@ class MetaServiceUtils final {

static meta::cpp2::Session parseSessionVal(const folly::StringPiece &val);

static std::string fulltextIndexKey(const std::string& indexName);

static std::string fulltextIndexVal(const cpp2::FTIndex& index);

static std::string parsefulltextIndexName(folly::StringPiece key);

static cpp2::FTIndex parsefulltextIndex(folly::StringPiece val);

static std::string fulltextIndexPrefix();

static std::string genTimestampStr();

static GraphSpaceID parseEdgesKeySpaceID(folly::StringPiece key);
Expand Down
7 changes: 7 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,16 @@ class BaseProcessor {
indexCheck(const std::vector<cpp2::IndexItem>& items,
const std::vector<cpp2::AlterSchemaItem>& alterItems);

nebula::cpp2::ErrorCode
ftIndexCheck(const std::vector<std::string>& cols,
const std::vector<cpp2::AlterSchemaItem>& alterItems);

ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::IndexItem>>
getIndexes(GraphSpaceID spaceId, int32_t tagOrEdge);

ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex>
getFTIndex(GraphSpaceID spaceId, int32_t tagOrEdge);

bool checkIndexExist(const std::vector<cpp2::IndexFieldDef>& fields,
const cpp2::IndexItem& item);

Expand Down
47 changes: 46 additions & 1 deletion src/meta/processors/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,31 @@ BaseProcessor<RESP>::getIndexes(GraphSpaceID spaceId, int32_t tagOrEdge) {
}
return items;
}
template<typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex>
BaseProcessor<RESP>::getFTIndex(GraphSpaceID spaceId, int32_t tagOrEdge) {
const auto& indexPrefix = MetaServiceUtils::fulltextIndexPrefix();
auto iterRet = doPrefix(indexPrefix);
if (!nebula::ok(iterRet)) {
auto retCode = nebula::error(iterRet);
LOG(ERROR) << "Tag or edge fulltext index prefix failed, error :"
<< apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
auto indexIter = nebula::value(iterRet).get();

while (indexIter->valid()) {
auto index = MetaServiceUtils::parsefulltextIndex(indexIter->val());
auto id = index.get_depend_schema().getType() == cpp2::SchemaID::Type::edge_type
? index.get_depend_schema().get_edge_type()
: index.get_depend_schema().get_tag_id();
if (spaceId == index.get_space_id() && tagOrEdge == id) {
return index;
}
indexIter->next();
}
return nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND;
}

template<typename RESP>
nebula::cpp2::ErrorCode
Expand Down Expand Up @@ -471,7 +495,28 @@ BaseProcessor<RESP>::indexCheck(const std::vector<cpp2::IndexItem>& items,
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

template<typename RESP>
nebula::cpp2::ErrorCode
BaseProcessor<RESP>::ftIndexCheck(const std::vector<std::string>& cols,
const std::vector<cpp2::AlterSchemaItem>& alterItems) {
for (const auto& item : alterItems) {
if (*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::CHANGE ||
*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::DROP) {
const auto& itemCols = item.get_schema().get_columns();
for (const auto& iCol : itemCols) {
auto it = std::find_if(cols.begin(), cols.end(),
[&] (const auto& c) {
return c == iCol.name;
});
if (it != cols.end()) {
LOG(ERROR) << "fulltext index conflict";
return nebula::cpp2::ErrorCode::E_CONFLICT;
}
}
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

template<typename RESP>
bool BaseProcessor<RESP>::checkIndexExist(const std::vector<cpp2::IndexFieldDef>& fields,
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ GENERATE_LOCK(edge);
GENERATE_LOCK(tagIndex);
GENERATE_LOCK(edgeIndex);
GENERATE_LOCK(fulltextServices);
GENERATE_LOCK(fulltextIndex);
GENERATE_LOCK(user);
GENERATE_LOCK(config);
GENERATE_LOCK(snapshot);
Expand Down
18 changes: 17 additions & 1 deletion src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "common/base/CommonMacro.h"
#include "meta/processors/indexMan/CreateEdgeIndexProcessor.h"

namespace nebula {
Expand Down Expand Up @@ -126,13 +127,28 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
return;
}
cpp2::ColumnDef col = *iter;
if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (col.type.get_type() == meta::cpp2::PropertyType::FIXED_STRING) {
if (*col.type.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
return;
}
} else if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (!field.type_length_ref().has_value()) {
LOG(ERROR) << "No type length set : " << field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}
if (*field.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
col.type.set_type(meta::cpp2::PropertyType::FIXED_STRING);
col.type.set_type_length(*field.get_type_length());
} else if (field.type_length_ref().has_value()) {
Expand Down
20 changes: 19 additions & 1 deletion src/meta/processors/indexMan/CreateTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "common/base/CommonMacro.h"
#include "meta/processors/indexMan/CreateTagIndexProcessor.h"

namespace nebula {
Expand Down Expand Up @@ -125,13 +126,30 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) {
return;
}
cpp2::ColumnDef col = *iter;
if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (col.type.get_type() == meta::cpp2::PropertyType::FIXED_STRING) {
if (*col.type.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
} else if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (!field.type_length_ref().has_value()) {
LOG(ERROR) << "No type length set : " << field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}
if (*field.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
col.type.set_type(meta::cpp2::PropertyType::FIXED_STRING);
col.type.set_type_length(*field.get_type_length());
} else if (field.type_length_ref().has_value()) {
Expand Down
Loading

0 comments on commit 5dcd4ef

Please sign in to comment.