Skip to content

Commit

Permalink
fix_batch_insert_problem (#3627)
Browse files Browse the repository at this point in the history
* filter data before batch insert

* add test cases

* add more testcase
  • Loading branch information
nevermore3 authored Jan 6, 2022
1 parent 71f66e9 commit fd3f925
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 21 deletions.
51 changes: 45 additions & 6 deletions src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
dummyLock.reserve(newEdges.size());
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;

std::unordered_set<std::string> visited;
visited.reserve(newEdges.size());
deleteDupEdge(const_cast<std::vector<cpp2::NewEdge>&>(newEdges));
for (auto& newEdge : newEdges) {
auto edgeKey = *newEdge.key_ref();
auto l = std::make_tuple(spaceId_,
Expand Down Expand Up @@ -203,9 +202,6 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
*edgeKey.edge_type_ref(),
*edgeKey.ranking_ref(),
edgeKey.dst_ref()->getStr());
if (ifNotExists_ && !visited.emplace(key).second) {
continue;
}
auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, std::abs(*edgeKey.edge_type_ref()));
if (!schema) {
LOG(ERROR) << "Space " << spaceId_ << ", Edge " << *edgeKey.edge_type_ref() << " invalid";
Expand Down Expand Up @@ -439,7 +435,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> AddEdgesProcessor::addEdges(
}
}
/*
* step 3 , Insert new vertex data
* step 3 , Insert new edge data
*/
auto key = e.first;
auto prop = e.second;
Expand Down Expand Up @@ -488,5 +484,48 @@ std::vector<std::string> AddEdgesProcessor::indexKeys(
std::move(values).value());
}

/*
* Batch insert
* ifNotExist_ is true. Only keep the first one when edgeKey is same
* ifNotExist_ is false. Only keep the last one when edgeKey is same
*/
void AddEdgesProcessor::deleteDupEdge(std::vector<cpp2::NewEdge>& edges) {
std::unordered_set<std::string> visited;
visited.reserve(edges.size());
if (ifNotExists_) {
auto iter = edges.begin();
while (iter != edges.end()) {
auto edgeKeyRef = iter->key_ref();
auto key = NebulaKeyUtils::edgeKey(spaceVidLen_,
0, // it's ok, just distinguish between different edgekey
edgeKeyRef->src_ref()->getStr(),
edgeKeyRef->get_edge_type(),
edgeKeyRef->get_ranking(),
edgeKeyRef->dst_ref()->getStr());
if (!visited.emplace(key).second) {
iter = edges.erase(iter);
} else {
++iter;
}
}
} else {
auto iter = edges.rbegin();
while (iter != edges.rend()) {
auto edgeKeyRef = iter->key_ref();
auto key = NebulaKeyUtils::edgeKey(spaceVidLen_,
0, // it's ok, just distinguish between different edgekey
edgeKeyRef->src_ref()->getStr(),
edgeKeyRef->get_edge_type(),
edgeKeyRef->get_ranking(),
edgeKeyRef->dst_ref()->getStr());
if (!visited.emplace(key).second) {
iter = decltype(iter)(edges.erase(std::next(iter).base()));
} else {
++iter;
}
}
}
}

} // namespace storage
} // namespace nebula
2 changes: 2 additions & 0 deletions src/storage/mutate/AddEdgesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class AddEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema);

void deleteDupEdge(std::vector<cpp2::NewEdge>& edges);

private:
GraphSpaceID spaceId_;
std::vector<std::shared_ptr<nebula::meta::cpp2::IndexItem>> indexes_;
Expand Down
40 changes: 33 additions & 7 deletions src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;

// cache tagKey
std::unordered_set<std::string> visited;
visited.reserve(vertices.size());
deleteDupVid(const_cast<std::vector<cpp2::NewVertex>&>(vertices));
for (auto& vertex : vertices) {
auto vid = vertex.get_id().getStr();
const auto& newTags = vertex.get_tags();
Expand Down Expand Up @@ -181,9 +180,6 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
}

auto key = NebulaKeyUtils::tagKey(spaceVidLen_, partId, vid, tagId);
if (ifNotExists_ && !visited.emplace(key).second) {
continue;
}
auto props = newTag.get_props();
auto iter = propNamesMap.find(tagId);
std::vector<std::string> propNames;
Expand Down Expand Up @@ -232,8 +228,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
if (oReader != nullptr) {
auto ois = indexKeys(partId, vid, oReader.get(), index, schema.get());
if (!ois.empty()) {
// Check the index is building for the specified partition or
// not.
// Check the index is building for the specified partition or not
auto indexState = env_->getIndexState(spaceId_, partId);
if (env_->checkRebuilding(indexState)) {
auto delOpKey = OperationKeyUtils::deleteOperationKey(partId);
Expand Down Expand Up @@ -345,5 +340,36 @@ std::vector<std::string> AddVerticesProcessor::indexKeys(
spaceVidLen_, partId, index->get_index_id(), vId, std::move(values).value());
}

/*
* Batch insert
* ifNotExist_ is true. Only keep the first one when vid is same
* ifNotExist_ is false. Only keep the last one when vid is same
*/
void AddVerticesProcessor::deleteDupVid(std::vector<cpp2::NewVertex>& vertices) {
std::unordered_set<std::string> visited;
visited.reserve(vertices.size());
if (ifNotExists_) {
auto iter = vertices.begin();
while (iter != vertices.end()) {
const auto& vid = iter->get_id().getStr();
if (!visited.emplace(vid).second) {
iter = vertices.erase(iter);
} else {
++iter;
}
}
} else {
auto iter = vertices.rbegin();
while (iter != vertices.rend()) {
const auto& vid = iter->get_id().getStr();
if (!visited.emplace(vid).second) {
iter = decltype(iter)(vertices.erase(std::next(iter).base()));
} else {
++iter;
}
}
}
}

} // namespace storage
} // namespace nebula
2 changes: 2 additions & 0 deletions src/storage/mutate/AddVerticesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class AddVerticesProcessor : public BaseProcessor<cpp2::ExecResponse> {
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema);

void deleteDupVid(std::vector<cpp2::NewVertex>& vertices);

private:
GraphSpaceID spaceId_;
std::vector<std::shared_ptr<nebula::meta::cpp2::IndexItem>> indexes_;
Expand Down
Loading

0 comments on commit fd3f925

Please sign in to comment.