Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Support specified columns for full-text index #460

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 14 additions & 23 deletions src/kvstore/plugins/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ bool ESListener::appendDocItem(std::vector<DocItem>& items, const KV& kv) const

bool ESListener::appendEdgeDocItem(std::vector<DocItem>& items, const KV& kv) const {
auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, kv.first);
auto schema = schemaMan_->getEdgeSchema(spaceId_, edgeType);
if (schema == nullptr) {
VLOG(3) << "get edge schema failed, edgeType " << edgeType;
return false;
auto ftIndex = schemaMan_->getFTIndex(spaceId_, edgeType);
if (!ftIndex.ok()) {
VLOG(3) << "get text search index failed";
return (ftIndex.status() == nebula::Status::IndexNotFound()) ? true : false;
}
auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_,
spaceId_,
Expand All @@ -167,15 +167,15 @@ bool ESListener::appendEdgeDocItem(std::vector<DocItem>& items, const KV& kv) co
VLOG(3) << "get edge reader failed, schema ID " << edgeType;
return false;
}
return appendDocs(items, schema.get(), reader.get(), edgeType, true);
return appendDocs(items, reader.get(), std::move(ftIndex).value());
}

bool ESListener::appendTagDocItem(std::vector<DocItem>& items, const KV& kv) const {
auto tagId = NebulaKeyUtils::getTagId(vIdLen_, kv.first);
auto schema = schemaMan_->getTagSchema(spaceId_, tagId);
if (schema == nullptr) {
VLOG(3) << "get tag schema failed, tagId " << tagId;
return false;
auto ftIndex = schemaMan_->getFTIndex(spaceId_, tagId);
if (!ftIndex.ok()) {
VLOG(3) << "get text search index failed";
return (ftIndex.status() == nebula::Status::IndexNotFound()) ? true : false;
}
auto reader = RowReaderWrapper::getTagPropReader(schemaMan_,
spaceId_,
Expand All @@ -185,27 +185,18 @@ bool ESListener::appendTagDocItem(std::vector<DocItem>& items, const KV& kv) con
VLOG(3) << "get tag reader failed, tagID " << tagId;
return false;
}
return appendDocs(items, schema.get(), reader.get(), tagId, false);
return appendDocs(items, reader.get(), std::move(ftIndex).value());
}

bool ESListener::appendDocs(std::vector<DocItem>& items,
const meta::SchemaProviderIf* schema,
RowReader* reader,
int32_t schemaId,
bool isEdge) const {
auto count = schema->getNumFields();
for (size_t i = 0; i < count; i++) {
auto name = schema->getFieldName(i);
auto v = reader->getValueByName(name);
const std::pair<std::string, nebula::meta::cpp2::FTIndex>& fti) const {
for (const auto& field : fti.second.get_fields()) {
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
continue;
}
auto ftIndex = nebula::plugin::IndexTraits::indexName(*spaceName_, isEdge);
items.emplace_back(DocItem(std::move(ftIndex),
std::move(name),
partId_,
schemaId,
std::move(v).getStr()));
items.emplace_back(DocItem(fti.first, field, partId_, std::move(v).getStr()));
}
return true;
}
Expand Down
7 changes: 2 additions & 5 deletions src/kvstore/plugins/elasticsearch/ESListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,8 @@ class ESListener : public Listener {

bool appendTagDocItem(std::vector<DocItem>& items, const KV& kv) const;

bool appendDocs(std::vector<DocItem>& items,
const meta::SchemaProviderIf* schema,
RowReader* reader,
int32_t schemaId,
bool isEdge) const;
bool appendDocs(std::vector<DocItem>& items, RowReader* reader,
const std::pair<std::string, nebula::meta::cpp2::FTIndex>& fti) const;

bool writeData(const std::vector<nebula::plugin::DocItem>& items) const;

Expand Down
1 change: 1 addition & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ nebula_add_library(
processors/indexMan/GetEdgeIndexProcessor.cpp
processors/indexMan/ListEdgeIndexesProcessor.cpp
processors/indexMan/FTServiceProcessor.cpp
processors/indexMan/FTIndexProcessor.cpp
processors/customKV/GetProcessor.cpp
processors/customKV/MultiGetProcessor.cpp
processors/customKV/MultiPutProcessor.cpp
Expand Down
19 changes: 19 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "meta/processors/indexMan/GetEdgeIndexProcessor.h"
#include "meta/processors/indexMan/ListEdgeIndexesProcessor.h"
#include "meta/processors/indexMan/FTServiceProcessor.h"
#include "meta/processors/indexMan/FTIndexProcessor.h"
#include "meta/processors/customKV/MultiPutProcessor.h"
#include "meta/processors/customKV/GetProcessor.h"
#include "meta/processors/customKV/MultiGetProcessor.h"
Expand Down Expand Up @@ -305,6 +306,24 @@ MetaServiceHandler::future_listFTClients(const cpp2::ListFTClientsReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_createFTIndex(const cpp2::CreateFTIndexReq& req) {
auto* processor = CreateFTIndexProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_dropFTIndex(const cpp2::DropFTIndexReq& req) {
auto* processor = DropFTIndexProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ListFTIndexesResp>
MetaServiceHandler::future_listFTIndexes(const cpp2::ListFTIndexesReq& req) {
auto* processor = ListFTIndexesProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::HBResp>
MetaServiceHandler::future_heartBeat(const cpp2::HBReq& req) {
auto* processor = HBProcessor::instance(kvstore_, &kHBCounters, clusterId_);
Expand Down
9 changes: 9 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListFTClientsResp>
future_listFTClients(const cpp2::ListFTClientsReq& req) override;

folly::Future<cpp2::ExecResp>
future_createFTIndex(const cpp2::CreateFTIndexReq& req) override;

folly::Future<cpp2::ExecResp>
future_dropFTIndex(const cpp2::DropFTIndexReq& req) override;

folly::Future<cpp2::ListFTIndexesResp>
future_listFTIndexes(const cpp2::ListFTIndexesReq& req) override;

/**
* User manager
**/
Expand Down
29 changes: 29 additions & 0 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1397,5 +1397,34 @@ meta::cpp2::Session MetaServiceUtils::parseSessionVal(const folly::StringPiece &
apache::thrift::CompactSerializer::deserialize(val, session);
return session;
}

std::string MetaServiceUtils::fulltextIndexKey(const std::string& indexName) {
std::string key;
key.reserve(kFTIndexTable.size() + indexName.size());
key.append(kFTIndexTable.data(), kFTIndexTable.size())
.append(indexName);
return key;
}

std::string MetaServiceUtils::fulltextIndexVal(const cpp2::FTIndex& index) {
std::string val;
apache::thrift::CompactSerializer::serialize(index, &val);
return val;
}

std::string MetaServiceUtils::parsefulltextIndexName(folly::StringPiece key) {
return key.subpiece(kFTIndexTable.size(), key.size()).toString();
}

cpp2::FTIndex MetaServiceUtils::parsefulltextIndex(folly::StringPiece val) {
cpp2::FTIndex ftIndex;
apache::thrift::CompactSerializer::deserialize(val, ftIndex);
return ftIndex;
}

std::string MetaServiceUtils::fulltextIndexPrefix() {
return kFTIndexTable;
}

} // namespace meta
} // namespace nebula
10 changes: 10 additions & 0 deletions src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,16 @@ class MetaServiceUtils final {

static meta::cpp2::Session parseSessionVal(const folly::StringPiece &val);

static std::string fulltextIndexKey(const std::string& indexName);

static std::string fulltextIndexVal(const cpp2::FTIndex& index);

static std::string parsefulltextIndexName(folly::StringPiece key);

static cpp2::FTIndex parsefulltextIndex(folly::StringPiece val);

static std::string fulltextIndexPrefix();

static std::string genTimestampStr();

static GraphSpaceID parseEdgesKeySpaceID(folly::StringPiece key);
Expand Down
7 changes: 7 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,16 @@ class BaseProcessor {
indexCheck(const std::vector<cpp2::IndexItem>& items,
const std::vector<cpp2::AlterSchemaItem>& alterItems);

nebula::cpp2::ErrorCode
ftIndexCheck(const std::vector<std::string>& cols,
const std::vector<cpp2::AlterSchemaItem>& alterItems);

ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::IndexItem>>
getIndexes(GraphSpaceID spaceId, int32_t tagOrEdge);

ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex>
getFTIndex(GraphSpaceID spaceId, int32_t tagOrEdge);

bool checkIndexExist(const std::vector<cpp2::IndexFieldDef>& fields,
const cpp2::IndexItem& item);

Expand Down
47 changes: 46 additions & 1 deletion src/meta/processors/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,31 @@ BaseProcessor<RESP>::getIndexes(GraphSpaceID spaceId, int32_t tagOrEdge) {
}
return items;
}
template<typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex>
BaseProcessor<RESP>::getFTIndex(GraphSpaceID spaceId, int32_t tagOrEdge) {
const auto& indexPrefix = MetaServiceUtils::fulltextIndexPrefix();
auto iterRet = doPrefix(indexPrefix);
if (!nebula::ok(iterRet)) {
auto retCode = nebula::error(iterRet);
LOG(ERROR) << "Tag or edge fulltext index prefix failed, error :"
<< apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
auto indexIter = nebula::value(iterRet).get();

while (indexIter->valid()) {
auto index = MetaServiceUtils::parsefulltextIndex(indexIter->val());
auto id = index.get_depend_schema().getType() == cpp2::SchemaID::Type::edge_type
? index.get_depend_schema().get_edge_type()
: index.get_depend_schema().get_tag_id();
if (spaceId == index.get_space_id() && tagOrEdge == id) {
return index;
}
indexIter->next();
}
return nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND;
}

template<typename RESP>
nebula::cpp2::ErrorCode
Expand Down Expand Up @@ -471,7 +495,28 @@ BaseProcessor<RESP>::indexCheck(const std::vector<cpp2::IndexItem>& items,
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

template<typename RESP>
nebula::cpp2::ErrorCode
BaseProcessor<RESP>::ftIndexCheck(const std::vector<std::string>& cols,
const std::vector<cpp2::AlterSchemaItem>& alterItems) {
for (const auto& item : alterItems) {
if (*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::CHANGE ||
*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::DROP) {
const auto& itemCols = item.get_schema().get_columns();
for (const auto& iCol : itemCols) {
auto it = std::find_if(cols.begin(), cols.end(),
[&] (const auto& c) {
return c == iCol.name;
});
if (it != cols.end()) {
LOG(ERROR) << "fulltext index conflict";
return nebula::cpp2::ErrorCode::E_CONFLICT;
}
}
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

template<typename RESP>
bool BaseProcessor<RESP>::checkIndexExist(const std::vector<cpp2::IndexFieldDef>& fields,
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ GENERATE_LOCK(edge);
GENERATE_LOCK(tagIndex);
GENERATE_LOCK(edgeIndex);
GENERATE_LOCK(fulltextServices);
GENERATE_LOCK(fulltextIndex);
GENERATE_LOCK(user);
GENERATE_LOCK(config);
GENERATE_LOCK(snapshot);
Expand Down
20 changes: 19 additions & 1 deletion src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "common/base/CommonMacro.h"
#include "meta/processors/indexMan/CreateEdgeIndexProcessor.h"

namespace nebula {
Expand Down Expand Up @@ -127,13 +128,30 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
return;
}
cpp2::ColumnDef col = *iter;
if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (col.type.get_type() == meta::cpp2::PropertyType::FIXED_STRING) {
if (*col.type.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
bright-starry-sky marked this conversation as resolved.
Show resolved Hide resolved
handleErrorCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
} else if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (!field.type_length_ref().has_value()) {
LOG(ERROR) << "No type length set : " << field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}
if (*field.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
col.type.set_type(meta::cpp2::PropertyType::FIXED_STRING);
col.type.set_type_length(*field.get_type_length());
} else if (field.type_length_ref().has_value()) {
Expand Down
20 changes: 19 additions & 1 deletion src/meta/processors/indexMan/CreateTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "common/base/CommonMacro.h"
#include "meta/processors/indexMan/CreateTagIndexProcessor.h"

namespace nebula {
Expand Down Expand Up @@ -126,13 +127,30 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) {
return;
}
cpp2::ColumnDef col = *iter;
if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (col.type.get_type() == meta::cpp2::PropertyType::FIXED_STRING) {
if (*col.type.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
} else if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (!field.type_length_ref().has_value()) {
LOG(ERROR) << "No type length set : " << field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}
if (*field.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
col.type.set_type(meta::cpp2::PropertyType::FIXED_STRING);
col.type.set_type_length(*field.get_type_length());
} else if (field.type_length_ref().has_value()) {
Expand Down
Loading