Skip to content

Commit

Permalink
lookup limit
Browse files Browse the repository at this point in the history
  • Loading branch information
bright-starry-sky committed Sep 6, 2021
1 parent fb6c911 commit af1d135
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 130 deletions.
10 changes: 8 additions & 2 deletions src/clients/storage/GraphStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,18 @@ folly::SemiFuture<StorageRpcResponse<cpp2::LookupIndexResp>> GraphStorageClient:
int32_t tagOrEdge,
const std::vector<std::string>& returnCols,
folly::EventBase* evb) {
// TODO(sky) : instead of isEdge and tagOrEdge to nebula::cpp2::SchemaID for graph layer.
auto status = getHostParts(space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::LookupIndexResp>>(
std::runtime_error(status.status().toString()));
}
nebula::cpp2::SchemaID schemaId;
if (isEdge) {
schemaId.set_edge_type(tagOrEdge);
} else {
schemaId.set_tag_id(tagOrEdge);
}

auto& clusters = status.value();
std::unordered_map<HostAddr, cpp2::LookupIndexRequest> requests;
Expand All @@ -468,8 +475,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::LookupIndexResp>> GraphStorageClient:

cpp2::IndexSpec spec;
spec.set_contexts(contexts);
spec.set_is_edge(isEdge);
spec.set_tag_or_edge_id(tagOrEdge);
spec.set_schema_id(schemaId);

req.set_indices(spec);
}
Expand Down
5 changes: 3 additions & 2 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,7 @@ struct IndexQueryContext {
struct IndexSpec {
// In order to union multiple indices, multiple index hints are allowed
1: required list<IndexQueryContext> contexts,
2: required bool is_edge,
3: required i32 tag_or_edge_id,
2: common.SchemaID schema_id,
}


Expand All @@ -521,6 +520,8 @@ struct LookupIndexRequest {
// The list of property names. Should not be empty.
// Support kVid and kTag for vertex, kSrc, kType, kRank and kDst for edge.
4: optional list<binary> return_columns,
// max row count of each partition in this response, 0 means no limit.
5: i64 limit = 0,
}


Expand Down
11 changes: 9 additions & 2 deletions src/storage/exec/IndexEdgeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ class IndexEdgeNode final : public RelNode<T> {
IndexEdgeNode(RuntimeContext* context,
IndexScanNode<T>* indexScanNode,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
const std::string& schemaName)
const std::string& schemaName,
int64_t limit = 0)
: context_(context),
indexScanNode_(indexScanNode),
schemas_(schemas),
schemaName_(schemaName) {}
schemaName_(schemaName),
limit_(limit) {}

nebula::cpp2::ErrorCode execute(PartitionID partId) override {
auto ret = RelNode<T>::execute(partId);
Expand All @@ -38,6 +40,7 @@ class IndexEdgeNode final : public RelNode<T> {
data_.clear();
std::vector<storage::cpp2::EdgeKey> edges;
auto* iter = static_cast<EdgeIndexIterator*>(indexScanNode_->iterator());
int64_t count = 0;
while (iter && iter->valid()) {
if (!iter->val().empty() && ttlProp.first) {
auto v = IndexKeyUtils::parseIndexTTL(iter->val());
Expand All @@ -54,6 +57,9 @@ class IndexEdgeNode final : public RelNode<T> {
edge.set_dst(iter->dstId());
edges.emplace_back(std::move(edge));
iter->next();
if (limit_ > 0 && ++count == limit_) {
break;
}
}
for (const auto& edge : edges) {
auto key = NebulaKeyUtils::edgeKey(context_->vIdLen(),
Expand Down Expand Up @@ -88,6 +94,7 @@ class IndexEdgeNode final : public RelNode<T> {
IndexScanNode<T>* indexScanNode_;
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas_;
const std::string& schemaName_;
int64_t limit_;
std::vector<kvstore::KV> data_;
};

Expand Down
10 changes: 8 additions & 2 deletions src/storage/exec/IndexScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ class IndexScanNode : public RelNode<T> {

IndexScanNode(RuntimeContext* context,
IndexID indexId,
std::vector<cpp2::IndexColumnHint> columnHints)
: context_(context), indexId_(indexId), columnHints_(std::move(columnHints)) {
std::vector<cpp2::IndexColumnHint> columnHints,
int64_t limit = 0)
: context_(context), indexId_(indexId), columnHints_(std::move(columnHints)), limit_(limit) {
/**
* columnHints's elements are {scanType = PREFIX|RANGE; beginStr; endStr},
* {scanType = PREFIX|RANGE; beginStr;
Expand Down Expand Up @@ -70,6 +71,7 @@ class IndexScanNode : public RelNode<T> {
auto* sh = context_->isEdge() ? context_->edgeSchema_ : context_->tagSchema_;
auto ttlProp = CommonUtils::ttlProps(sh);
data_.clear();
int64_t count = 0;
while (!!iter_ && iter_->valid()) {
if (!iter_->val().empty() && ttlProp.first) {
auto v = IndexKeyUtils::parseIndexTTL(iter_->val());
Expand All @@ -81,6 +83,9 @@ class IndexScanNode : public RelNode<T> {
}
data_.emplace_back(iter_->key(), "");
iter_->next();
if (limit_ > 0 && ++count == limit_) {
break;
}
}
return std::move(data_);
}
Expand Down Expand Up @@ -168,6 +173,7 @@ class IndexScanNode : public RelNode<T> {
std::unique_ptr<IndexIterator> iter_;
std::pair<std::string, std::string> scanPair_;
std::vector<cpp2::IndexColumnHint> columnHints_;
int64_t limit_;
std::vector<kvstore::KV> data_;
};

Expand Down
11 changes: 9 additions & 2 deletions src/storage/exec/IndexVertexNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ class IndexVertexNode final : public RelNode<T> {
IndexVertexNode(RuntimeContext* context,
IndexScanNode<T>* indexScanNode,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
const std::string& schemaName)
const std::string& schemaName,
int64_t limit = 0)
: context_(context),
indexScanNode_(indexScanNode),
schemas_(schemas),
schemaName_(schemaName) {}
schemaName_(schemaName),
limit_(limit) {}

nebula::cpp2::ErrorCode execute(PartitionID partId) override {
auto ret = RelNode<T>::execute(partId);
Expand All @@ -38,6 +40,7 @@ class IndexVertexNode final : public RelNode<T> {
data_.clear();
std::vector<VertexID> vids;
auto* iter = static_cast<VertexIndexIterator*>(indexScanNode_->iterator());
int64_t count = 0;
while (iter && iter->valid()) {
if (!iter->val().empty() && ttlProp.first) {
auto v = IndexKeyUtils::parseIndexTTL(iter->val());
Expand All @@ -49,6 +52,9 @@ class IndexVertexNode final : public RelNode<T> {
}
vids.emplace_back(iter->vId());
iter->next();
if (limit_ > 0 && ++count == limit_) {
break;
}
}
for (const auto& vId : vids) {
VLOG(1) << "partId " << partId << ", vId " << vId << ", tagId " << context_->tagId_;
Expand Down Expand Up @@ -79,6 +85,7 @@ class IndexVertexNode final : public RelNode<T> {
IndexScanNode<T>* indexScanNode_;
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas_;
const std::string& schemaName_;
int64_t limit_;
std::vector<kvstore::KV> data_;
};

Expand Down
34 changes: 19 additions & 15 deletions src/storage/index/LookupBaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ nebula::cpp2::ErrorCode LookupBaseProcessor<REQ, RESP>::requestCheck(
this->planContext_ =
std::make_unique<PlanContext>(this->env_, spaceId_, this->spaceVidLen_, this->isIntId_);
const auto& indices = req.get_indices();
this->planContext_->isEdge_ = indices.get_is_edge();
const auto& schemaId = indices.get_schema_id();
this->planContext_->isEdge_ = schemaId.getType() == nebula::cpp2::SchemaID::Type::edge_type;
this->context_ = std::make_unique<RuntimeContext>(this->planContext_.get());
if (context_->isEdge()) {
context_->edgeType_ = indices.get_tag_or_edge_id();
context_->edgeType_ = schemaId.get_edge_type();
auto edgeName = this->env_->schemaMan_->toEdgeName(spaceId_, context_->edgeType_);
if (!edgeName.ok()) {
return nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND;
Expand All @@ -41,7 +42,7 @@ nebula::cpp2::ErrorCode LookupBaseProcessor<REQ, RESP>::requestCheck(
schemas_ = std::move(allEdges).value()[context_->edgeType_];
context_->edgeSchema_ = schemas_.back().get();
} else {
context_->tagId_ = indices.get_tag_or_edge_id();
context_->tagId_ = schemaId.get_tag_id();
auto tagName = this->env_->schemaMan_->toTagName(spaceId_, context_->tagId_);
if (!tagName.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND;
Expand Down Expand Up @@ -76,6 +77,9 @@ nebula::cpp2::ErrorCode LookupBaseProcessor<REQ, RESP>::requestCheck(
}
}

// limit
limit_ = req.get_limit();

return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand Down Expand Up @@ -276,8 +280,8 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
const std::vector<meta::cpp2::ColumnDef>& fields) {
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();
auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);

auto output = std::make_unique<IndexOutputNode<IndexID>>(
result, context_.get(), indexScan.get(), hasNullableCol, fields);
Expand Down Expand Up @@ -310,11 +314,11 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);
if (context_->isEdge()) {
auto edge = std::make_unique<IndexEdgeNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->edgeName_);
context_.get(), indexScan.get(), schemas_, context_->edgeName_, limit_);
edge->addDependency(indexScan.get());
auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), edge.get());
output->addDependency(edge.get());
Expand All @@ -323,7 +327,7 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
return output;
} else {
auto vertex = std::make_unique<IndexVertexNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->tagName_);
context_.get(), indexScan.get(), schemas_, context_->tagName_, limit_);
vertex->addDependency(indexScan.get());
auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), vertex.get());
output->addDependency(vertex.get());
Expand Down Expand Up @@ -361,8 +365,8 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);

auto filter =
std::make_unique<IndexFilterNode<IndexID>>(indexScan.get(), exprCtx, exp, context_->isEdge());
Expand Down Expand Up @@ -412,11 +416,11 @@ LookupBaseProcessor<REQ, RESP>::buildPlanWithDataAndFilter(nebula::DataSet* resu
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);
if (context_->isEdge()) {
auto edge = std::make_unique<IndexEdgeNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->edgeName_);
context_.get(), indexScan.get(), schemas_, context_->edgeName_, limit_);
edge->addDependency(indexScan.get());
auto filter = std::make_unique<IndexFilterNode<IndexID>>(edge.get(), exprCtx, exp);
filter->addDependency(edge.get());
Expand All @@ -429,7 +433,7 @@ LookupBaseProcessor<REQ, RESP>::buildPlanWithDataAndFilter(nebula::DataSet* resu
return output;
} else {
auto vertex = std::make_unique<IndexVertexNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->tagName_);
context_.get(), indexScan.get(), schemas_, context_->tagName_, limit_);
vertex->addDependency(indexScan.get());
auto filter = std::make_unique<IndexFilterNode<IndexID>>(vertex.get(), exprCtx, exp);
filter->addDependency(vertex.get());
Expand Down
1 change: 1 addition & 0 deletions src/storage/index/LookupBaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class LookupBaseProcessor : public BaseProcessor<RESP> {
// Save schemas when column is out of index, need to read from data
std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>> schemas_;
std::vector<size_t> deDupColPos_;
int64_t limit_ = 0;
};

} // namespace storage
Expand Down
5 changes: 3 additions & 2 deletions src/storage/test/DeleteTagsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ cpp2::LookupIndexRequest buildLookupRequest(int32_t totalParts, std::string play
cpp2::LookupIndexRequest req;
nebula::storage::cpp2::IndexSpec indices;
req.set_space_id(1);
indices.set_tag_or_edge_id(1);
indices.set_is_edge(false);
nebula::cpp2::SchemaID schemaId;
schemaId.set_tag_id(1);
indices.set_schema_id(schemaId);
std::vector<PartitionID> parts;
for (PartitionID partId = 1; partId <= totalParts; partId++) {
parts.emplace_back(partId);
Expand Down
29 changes: 17 additions & 12 deletions src/storage/test/IndexWithTTLTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ TEST(IndexWithTTLTest, RebuildTagIndexWithTTL) {

// Wait for the task finished
do {
usleep(500);
sleep(1);
} while (!manager_->isFinished(context.jobId_, context.taskId_));

manager_->shutdown();
Expand Down Expand Up @@ -510,7 +510,7 @@ TEST(IndexWithTTLTest, RebuildEdgeIndexWithTTL) {

// Wait for the task finished
do {
usleep(500);
sleep(1);
} while (!manager_->isFinished(context.jobId_, context.taskId_));

manager_->shutdown();
Expand Down Expand Up @@ -581,7 +581,7 @@ TEST(IndexWithTTLTest, RebuildTagIndexWithTTLExpired) {

// Wait for the task finished
do {
usleep(500);
sleep(1);
} while (!manager_->isFinished(context.jobId_, context.taskId_));

manager_->shutdown();
Expand Down Expand Up @@ -652,7 +652,7 @@ TEST(IndexWithTTLTest, RebuildEdgeIndexWithTTLExpired) {

// Wait for the task finished
do {
usleep(500);
sleep(1);
} while (!manager_->isFinished(context.jobId_, context.taskId_));
manager_->shutdown();

Expand Down Expand Up @@ -686,8 +686,9 @@ TEST(IndexWithTTLTest, LookupTagIndexWithTTL) {
cpp2::LookupIndexRequest req;
nebula::storage::cpp2::IndexSpec indices;
req.set_space_id(1);
indices.set_tag_or_edge_id(2021001);
indices.set_is_edge(false);
nebula::cpp2::SchemaID schemaId;
schemaId.set_tag_id(2021001);
indices.set_schema_id(schemaId);
std::vector<PartitionID> parts;
for (int32_t p = 1; p <= 6; p++) {
parts.emplace_back(p);
Expand Down Expand Up @@ -729,8 +730,10 @@ TEST(IndexWithTTLTest, LookupEdgeIndexWithTTL) {
cpp2::LookupIndexRequest req;
nebula::storage::cpp2::IndexSpec indices;
req.set_space_id(1);
indices.set_tag_or_edge_id(2021001);
indices.set_is_edge(true);
nebula::cpp2::SchemaID schemaId;
schemaId.set_edge_type(2021001);
indices.set_schema_id(schemaId);

std::vector<PartitionID> parts;
for (int32_t p = 1; p <= 6; p++) {
parts.emplace_back(p);
Expand Down Expand Up @@ -774,8 +777,9 @@ TEST(IndexWithTTLTest, LookupTagIndexWithTTLExpired) {
cpp2::LookupIndexRequest req;
nebula::storage::cpp2::IndexSpec indices;
req.set_space_id(1);
indices.set_tag_or_edge_id(2021001);
indices.set_is_edge(false);
nebula::cpp2::SchemaID schemaId;
schemaId.set_tag_id(2021001);
indices.set_schema_id(schemaId);
std::vector<PartitionID> parts;
for (int32_t p = 1; p <= 6; p++) {
parts.emplace_back(p);
Expand Down Expand Up @@ -819,8 +823,9 @@ TEST(IndexWithTTLTest, LookupEdgeIndexWithTTLExpired) {
cpp2::LookupIndexRequest req;
nebula::storage::cpp2::IndexSpec indices;
req.set_space_id(1);
indices.set_tag_or_edge_id(2021001);
indices.set_is_edge(true);
nebula::cpp2::SchemaID schemaId;
schemaId.set_edge_type(2021001);
indices.set_schema_id(schemaId);
std::vector<PartitionID> parts;
for (int32_t p = 1; p <= 6; p++) {
parts.emplace_back(p);
Expand Down
Loading

0 comments on commit af1d135

Please sign in to comment.